Command line client for automated backups
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

419 lines
15 KiB

  1. #!/usr/bin/env python3
  2. import argparse
  3. from configparser import ConfigParser
  4. from urllib.parse import urlparse
  5. from os.path import normpath, join, exists
  6. from os import chmod, chown, stat, environ
  7. from enum import Enum
  8. import subprocess
  9. from requests import get, put, head
  10. from threading import Thread
  11. SSH_KEY_PATH = environ["DATADB_KEYPATH"] if "DATADB_KEYPATH" in environ else '/root/.ssh/datadb.key'
  12. RSYNC_DEFAULT_ARGS = ['rsync', '-avzr', '--exclude=.datadb.lock', '--whole-file', '--one-file-system', '--delete']
  13. SSH_CMD = 'ssh -i {} -p {} -o StrictHostKeyChecking=no'
  14. class SyncStatus(Enum):
  15. "Data is on local disk"
  16. DATA_AVAILABLE = 1
  17. "Data is not on local disk"
  18. DATA_MISSING = 2
  19. # Requests will call tell() on the file-like stdout stream if the tell attribute exists. However subprocess'
  20. # stdout stream (_io.BufferedReader) does not support this (raises OSError: [Errno 29] Illegal seek).
  21. # If the tell attribute is missing, requests will fall back to simply iterating on the file-like object,
  22. # so, we support only the iterable interface
  23. class WrappedStdout(object):
  24. BUFFSIZE = 256 * 1024
  25. def __init__(self, stdout):
  26. self.stdout = stdout
  27. def __iter__(self):
  28. return self
  29. def __next__(self):
  30. data = self.stdout.read(self.BUFFSIZE)
  31. if not data:
  32. raise StopIteration()
  33. return data
  34. def close(self):
  35. self.stdout.close()
  36. def restore(api_url, profile, conf, force=False): # remote_uri, local_dir, identity='/root/.ssh/datadb.key'
  37. """
  38. Restore data from datadb
  39. """
  40. # Sanity check: If the lockfile exists we assume the data is already there, so we wouldn't want to call rsync again
  41. # as it would wipe out local changes. This can be overridden with --force
  42. if not ((status(profile, conf) == SyncStatus.DATA_MISSING) or force):
  43. raise Exception("Data already exists (Use --force?)")
  44. original_perms = stat(conf["dir"])
  45. dest = urlparse(conf["uri"])
  46. status_code = head(api_url + 'get_backup', params={'proto': dest.scheme, 'name': profile}).status_code
  47. if status_code == 404:
  48. print("Connected to datadb, but datasource '{}' doesn't exist. Exiting".format(profile))
  49. # TODO: special exit code >1 to indicate this?
  50. return
  51. if dest.scheme == 'rsync':
  52. args = RSYNC_DEFAULT_ARGS[:]
  53. args += ['-e', SSH_CMD.format(SSH_KEY_PATH, dest.port or 22)]
  54. # Request backup server to prepare the backup, the returned dir is what we sync from
  55. rsync_path = get(api_url + 'get_backup', params={'proto': 'rsync', 'name': profile}).text.rstrip()
  56. # Add rsync source path
  57. args.append('nexus@{}:{}'.format(dest.hostname, normpath(rsync_path) + '/'))
  58. # Add local dir
  59. args.append(normpath(conf["dir"]) + '/')
  60. print("Rsync restore call: {}".format(' '.join(args)))
  61. subprocess.check_call(args)
  62. elif dest.scheme == 'archive':
  63. # http request backup server
  64. # download tarball
  65. args_curl = ['curl', '-s', '-v', '-XGET', '{}get_backup?proto=archive&name={}'.format(api_url, profile)]
  66. # unpack
  67. args_tar = [get_tarcmd(), 'zxv', '-C', normpath(conf["dir"]) + '/']
  68. print("Tar restore call: {} | {}".format(' '.join(args_curl), ' '.join(args_tar)))
  69. dl = subprocess.Popen(args_curl, stdout=subprocess.PIPE)
  70. extract = subprocess.Popen(args_tar, stdin=dl.stdout)
  71. dl.wait()
  72. extract.wait()
  73. # TODO: convert to pure python?
  74. if dl.returncode != 0:
  75. raise Exception("Could not download archive")
  76. if extract.returncode != 0:
  77. raise Exception("Could not extract archive")
  78. # Restore original permissions on data dir
  79. # TODO store these in conf file
  80. chmod(conf["dir"], original_perms.st_mode)
  81. chown(conf["dir"], original_perms.st_uid, original_perms.st_gid)
  82. # TODO apply other permissions
  83. def backup(api_url, profile, conf, force=False):
  84. """
  85. Backup data to datadb
  86. """
  87. # Sanity check: If the lockfile doesn't exist we assume the data is missing, so we wouldn't want to call rsync
  88. # again as it would wipe out the backup.
  89. if not ((status(profile, conf) == SyncStatus.DATA_AVAILABLE) or force):
  90. raise Exception("Data is missing (Use --force?)")
  91. dest = urlparse(conf["uri"])
  92. if dest.scheme == 'rsync':
  93. args = RSYNC_DEFAULT_ARGS[:]
  94. args += ['-e', SSH_CMD.format(SSH_KEY_PATH, dest.port or 22)]
  95. # args += ["--port", str(dest.port or 22)]
  96. # Excluded paths
  97. if conf["exclude"]:
  98. for exclude_path in conf["exclude"].split(","):
  99. if not exclude_path == "":
  100. args.append("--exclude")
  101. args.append(exclude_path)
  102. # Add local dir
  103. args.append(normpath(conf["dir"]) + '/')
  104. new_backup_params = {'proto': 'rsync',
  105. 'name': profile,
  106. 'keep': conf["keep"]}
  107. if conf["inplace"]:
  108. new_backup_params["inplace"] = 1
  109. # Hit backupdb via http to retreive absolute path of rsync destination of remote server
  110. rsync_path, token = get(api_url + 'new_backup', params=new_backup_params).json()
  111. # Add rsync source path
  112. args.append(normpath('nexus@{}:{}'.format(dest.hostname, rsync_path)) + '/')
  113. # print("Rsync backup call: {}".format(' '.join(args)))
  114. try:
  115. subprocess.check_call(args)
  116. except subprocess.CalledProcessError as cpe:
  117. if cpe.returncode not in [0, 24]: # ignore partial transfer due to vanishing files on our end
  118. raise
  119. # confirm completion if backup wasnt already in place
  120. if not conf["inplace"]:
  121. put(api_url + 'new_backup', params={'proto': 'rsync', 'name': profile, 'token': token,
  122. 'keep': conf["keep"]})
  123. elif dest.scheme == 'archive':
  124. # CD to local source dir
  125. # tar+gz data and stream to backup server
  126. args_tar = []
  127. if has_binary("ionice"):
  128. args_tar += ['ionice', '-c', '3']
  129. args_tar += ['nice', '-n', '19']
  130. args_tar += [get_tarcmd(),
  131. '--exclude=.datadb.lock',
  132. '--warning=no-file-changed',
  133. '--warning=no-file-removed',
  134. '--warning=no-file-ignored',
  135. '--warning=no-file-shrank']
  136. # Use pigz if available (Parallel gzip - http://zlib.net/pigz/)
  137. if has_binary("pigz"):
  138. args_tar += ["--use-compress-program", "pigz"]
  139. else:
  140. args_tar += ["-z"]
  141. # Excluded paths
  142. if conf["exclude"]:
  143. for exclude_path in conf["exclude"].split(","):
  144. if not exclude_path == "":
  145. args_tar.append("--exclude")
  146. args_tar.append(exclude_path)
  147. args_tar += ['-cv', './']
  148. tar_dir = normpath(conf["dir"]) + '/'
  149. print("Tar call in {}: {}".format(args_tar, tar_dir))
  150. tar = subprocess.Popen(args_tar, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=tar_dir)
  151. put_url = '{}new_backup?proto=archive&name={}&keep={}'.format(api_url, profile, conf["keep"])
  152. print("Putting to: {}".format(put_url))
  153. tar_errors = []
  154. error_scanner = Thread(target=scan_errors, args=(tar.stderr, tar_errors), daemon=True)
  155. error_scanner.start()
  156. upload = put(put_url, data=WrappedStdout(tar.stdout))
  157. if upload.status_code != 200:
  158. print(upload.text)
  159. raise Exception("Upload failed with code: {}".format(upload.status_code))
  160. tar.wait()
  161. error_scanner.join()
  162. if tar.returncode != 0 and len(tar_errors) > 0:
  163. raise Exception("Tar process exited with nonzero code {}. Tar errors: \n {}".
  164. format(tar.returncode, "\n ".join(tar_errors)))
  165. def scan_errors(stream, error_list):
  166. """
  167. Read and print lines from a stream, appending messages that look like errors to error_list
  168. """
  169. # Tar does not have an option to ignore file-removed errors. The warnings can be hidden but even with
  170. # --ignore-failed-read, file-removed errors cause a non-zero exit. So, hide the warnings we don't care about
  171. # using --warnings=no-xxx and scan output for unknown messages, assuming anything found is bad.
  172. for line in stream:
  173. line = line.decode("UTF-8").strip()
  174. if not line.startswith("./"):
  175. if line not in error_list:
  176. error_list.append(line)
  177. print(line)
  178. def status(profile, conf):
  179. """
  180. Check status of local dir - if the lock file is in place, we assume the data is there
  181. """
  182. lockfile = join(conf["dir"], '.datadb.lock')
  183. if exists(lockfile):
  184. return SyncStatus.DATA_AVAILABLE
  185. return SyncStatus.DATA_MISSING
  186. def shell_exec(cmd, workdir='/tmp/'):
  187. """
  188. Execute a command in shell, wait for exit.
  189. """
  190. print("Calling: {}".format(cmd))
  191. subprocess.Popen(cmd, shell=True, cwd=workdir).wait()
  192. def get_tarcmd():
  193. return "gtar" if has_binary("gtar") else "tar"
  194. def has_binary(name):
  195. """
  196. Check if the passed command is available
  197. :return: boolean
  198. """
  199. try:
  200. subprocess.check_call(['which', name], stdout=subprocess.DEVNULL)
  201. except subprocess.CalledProcessError:
  202. return False
  203. return True
  204. def main():
  205. """
  206. Excepts a config file at /etc/datadb.ini. Example:
  207. ----------------------------
  208. [gyfd]
  209. uri=
  210. dir=
  211. keep=
  212. auth=
  213. restore_preexec=
  214. restore_postexec=
  215. export_preexec=
  216. export_postexec=
  217. exclude=
  218. ----------------------------
  219. Each [section] defines one backup task.
  220. Fields:
  221. *uri*: Destination/source for this instance's data. Always fits the following format:
  222. <procotol>://<server>/<backup name>
  223. Valid protocols:
  224. rsync - rsync executed over SSH. The local dir will be synced with the remote backup dir using rsync.
  225. archive - tar archives transported over HTTP. The local dir will be tarred and PUT to the backup server's
  226. remote dir via http.
  227. *dir*: Local dir for this backup
  228. *keep*: Currently unused. Number of historical copies to keep on remote server
  229. *auth*: Currently unused. Username:password string to use while contacting the datadb via HTTP.
  230. *restore_preexec*: Shell command to exec before pulling/restoring data
  231. *restore_postexec*: Shell command to exec after pulling/restoring data
  232. *export_preexec*: Shell command to exec before pushing data
  233. *export_postexec*: Shell command to exec after pushing data
  234. *exclude*: if the underlying transport method supports excluding paths, a comma separated list of paths to exclude.
  235. Applies to backup operations only.
  236. *inplace*: rsync only. if enabled, the server will keep only a single copy that you will rsync over. intended for
  237. single copies of LARGE datasets. overrides "keep".
  238. """
  239. required_conf_params = ['dir', 'uri']
  240. conf_params = {'export_preexec': None,
  241. 'exclude': None,
  242. 'keep': 5,
  243. 'restore_preexec': None,
  244. 'restore_postexec': None,
  245. 'auth': '',
  246. 'export_postexec': None,
  247. 'inplace': False}
  248. conf_path = environ["DATADB_CONF"] if "DATADB_CONF" in environ else "/etc/datadb.ini"
  249. # Load profiles
  250. config = ConfigParser()
  251. config.read(conf_path)
  252. config = {section: {k: config[section][k] for k in config[section]} for section in config.sections()}
  253. global_config = {}
  254. for conf_k, conf_dict in config.items():
  255. if conf_k == "_backupdb":
  256. global_config = conf_dict
  257. continue
  258. for expect_param, expect_default in conf_params.items():
  259. if expect_param not in conf_dict.keys():
  260. conf_dict[expect_param] = expect_default
  261. for expect_param in required_conf_params:
  262. if expect_param not in conf_dict.keys():
  263. raise Exception("Required parameter {} missing for profile {}".format(expect_param, conf_k))
  264. parser = argparse.ArgumentParser(description="Backupdb Agent depends on config: /etc/datadb.ini")
  265. parser.add_argument('-f', '--force', default=False, action='store_true',
  266. help='force restore operation if destination data already exists')
  267. parser.add_argument('--http-api', help="http endpoint", default=environ.get('DATADB_HTTP_API'))
  268. parser.add_argument('-n', '--no-exec', default=False, action='store_true', help='don\'t run pre/post-exec commands')
  269. parser.add_argument('-b', '--no-pre-exec', default=False, action='store_true', help='don\'t run pre-exec commands')
  270. parser.add_argument('-m', '--no-post-exec', default=False, action='store_true',
  271. help='don\'t run post-exec commands')
  272. parser.add_argument('profile', type=str, choices=config.keys(), help='Profile to restore')
  273. # parser.add_argument('-i', '--identity',
  274. # help='Ssh keyfile to use', type=str, default='/root/.ssh/datadb.key')
  275. # parser.add_argument('-r', '--remote',
  276. # help='Remote server (rsync://...)', type=str, required=True)
  277. # parser.add_argument('-l', '--local_dir',
  278. # help='Local path', type=str, required=True)
  279. subparser_modes = parser.add_subparsers(dest='mode', help='modes (only "rsync")')
  280. subparser_modes.add_parser('backup', help='backup to datastore')
  281. subparser_modes.add_parser('restore', help='restore from datastore')
  282. subparser_modes.add_parser('status', help='get info for profile')
  283. args = parser.parse_args()
  284. if args.http_api:
  285. api = args.http_api
  286. else:
  287. api = global_config.get("http_api", None)
  288. if not api:
  289. parser.error("--http-api is requried")
  290. if args.no_exec:
  291. args.no_pre_exec = True
  292. args.no_post_exec = True
  293. if args.mode == 'restore':
  294. if not args.no_pre_exec and config[args.profile]['restore_preexec']:
  295. shell_exec(config[args.profile]['restore_preexec'])
  296. restore(api, args.profile, config[args.profile], force=args.force)
  297. if not args.no_post_exec and config[args.profile]['restore_postexec']:
  298. shell_exec(config[args.profile]['restore_postexec'])
  299. elif args.mode == 'backup':
  300. if not args.no_pre_exec and config[args.profile]['export_preexec']:
  301. shell_exec(config[args.profile]['export_preexec'])
  302. try:
  303. backup(api, args.profile, config[args.profile], force=args.force)
  304. finally:
  305. if not args.no_post_exec and config[args.profile]['export_postexec']:
  306. shell_exec(config[args.profile]['export_postexec'])
  307. elif args.mode == 'status':
  308. info = status(args.profile, config[args.profile])
  309. print(SyncStatus(info))
  310. else:
  311. parser.print_usage()
  312. if __name__ == '__main__':
  313. main()