docker-artifact/repobot/provider.py

227 lines
7.9 KiB
Python

import os
import shutil
from repobot.common import plist, pmap
from jinja2 import Environment, FileSystemLoader, select_autoescape
import cherrypy
class DuplicateException(Exception):
pass
class PkgProvider(object):
def __init__(self, db, repo, datadir):
"""
Base package provider class
"""
self.db = db
self.repo = repo
self.dir = datadir
def render(self):
"""
Respond to requests to browse the repo
"""
raise NotImplementedError()
def add_package(self, pkobj, fname, fobj, params):
"""
Add a package to the repo
"""
raise NotImplementedError()
def get_path(self, pkgobj, fname):
"""
Get the path to a package file in the repo
"""
raise NotImplementedError()
class PyPiProvider(PkgProvider):
def add_package(self, pkgobj, fname, fobj, params):
if "files" not in pkgobj.data:
pkgobj.data["files"] = plist()
if fname in pkgobj.data["files"]:
raise DuplicateException("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version))
pkgdir = os.path.join(self.dir, pkgobj.name)
os.makedirs(pkgdir, exist_ok=True)
# TODO handle duplicate files better
pkgfilepath = os.path.join(pkgdir, fname)
with open(pkgfilepath, "wb") as fdest:
shutil.copyfileobj(fobj, fdest)
pkgobj.data["files"].append(fname)
def browse(self, args):
tpl = Environment(loader=FileSystemLoader("templates"), autoescape=select_autoescape(['html', 'xml']))
if len(args) == 0: # repo root
return tpl.get_template("pypi/root.html"). \
render(reponame=self.repo.name,
packages=self.repo.packages.keys())
elif len(args) == 1: # single module dir
files = []
if args[0] not in self.repo.packages:
raise cherrypy.HTTPError(404, 'Invalid package')
for _, version in self.repo.packages[args[0]].items():
files += version.data["files"]
return tpl.get_template("pypi/project.html"). \
render(reponame=self.repo.name,
modulename=args[0],
files=files)
elif len(args) == 2: # fetch file
fpath = os.path.join(self.dir, args[0], args[1])
return cherrypy.lib.static.serve_file(os.path.abspath(fpath), "application/octet-stream")
def get_path(self, pkgobj, fname):
assert fname in pkgobj.data["files"]
return os.path.join(self.dir, pkgobj.name, fname)
from subprocess import check_call, check_output, Popen, PIPE
from tempfile import NamedTemporaryFile, TemporaryDirectory
import json
class AptlyConfig(object):
"""
Context manager providing an aptly config file
"""
def __init__(self, rootdir):
self.conf = {"rootDir": rootdir} # , "gpgDisableSign": True, "gpgDisableVerify": True}
self.file = None
def __enter__(self):
self.file = NamedTemporaryFile()
with open(self.file.name, "w") as f:
f.write(json.dumps(self.conf))
return self.file.name
def __exit__(self, *args):
self.file.close()
class AptProvider(PkgProvider):
def add_package(self, pkgobj, fname, fobj, params):
# first package added sets the Distribution of the repo
# subsequent package add MUST specify the same dist
if "dist" not in self.repo.data:
self.repo.data["dist"] = params["dist"]
assert self.repo.data["dist"] == params["dist"]
# Generate a GPG key to sign packages in this repo
# TODO support passing keypath=... param to import existing keys and maybe other key generation options
if not os.path.exists(self._gpg_dir):
self._generate_gpg_key()
if "files" not in pkgobj.data:
pkgobj.data["files"] = plist()
if fname in pkgobj.data["files"]:
raise DuplicateException("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version))
with AptlyConfig(self.dir) as conf:
if not os.path.exists(os.path.join(self.dir, "db")):
os.makedirs(self.dir, exist_ok=True)
check_call(["aptly", "-config", conf, "repo", "create",
"-distribution", self.repo.data["dist"], "main"]) # TODO dist param
# put the file somewhere for now
with TemporaryDirectory() as tdir:
tmppkgpath = os.path.join(tdir, fname)
with open(tmppkgpath, "wb") as fdest:
shutil.copyfileobj(fobj, fdest)
check_call(["aptly", "-config", conf, "repo", "add", "main", tmppkgpath])
if not os.path.exists(os.path.join(self.dir, "public")):
check_call(["aptly", "-config", conf, "publish", "repo", "main"],
env=self._env)
else:
check_call(["aptly", "-config", conf, "publish", "update",
"-force-overwrite", self.repo.data["dist"]],
env=self._env)
# Make the public key available for clients
self._export_pubkey()
pkgobj.data["files"].append(fname)
# TODO validate deb file name version against user passed version
def browse(self, args):
if not args:
with open(self.pubkeypath) as f:
pubkey_body = f.read()
return "<plaintext>{}\n\nSigning key:\n\n\n{}".format(self._get_gpg_info(), pubkey_body)
fpath = os.path.abspath(os.path.join(self.dir, "public", *args))
if not os.path.exists(fpath):
raise cherrypy.HTTPError(404)
return cherrypy.lib.static.serve_file(fpath)
def _get_gpg_info(self):
return check_output(["gpg", "--list-keys"], env=self._env).decode("UTF-8")
def _generate_gpg_key(self):
"""
Generate a GPG key for signing packages in this repo. Because only gpg2 supports unattended generation of
passwordless keys we generate the key with gpg2 then export/import it into gpg1.
"""
# Generate the key
os.makedirs(self._gpg_dir)
proc = Popen(["gpg", "--batch", "--gen-key"], stdin=PIPE, env=self._env)
proc.stdin.write("""%no-protection
Key-Type: rsa
Key-Length: 1024
Subkey-Type: default
Subkey-Length: 1024
Name-Real: Apt Master
Name-Comment: Apt signing key
Name-Email: aptmaster@localhost
Expire-Date: 0
%commit""".encode("ascii"))
proc.stdin.close()
proc.wait()
assert proc.returncode == 0
# Export the private key
keydata = check_output(["gpg", "--export-secret-key", "--armor", "aptmaster@localhost"], env=self._env)
shutil.rmtree(self._gpg_dir)
os.makedirs(self._gpg_dir)
# Import the private key
proc = Popen(["gpg1", "--import"], stdin=PIPE, env=self._env)
proc.stdin.write(keydata)
proc.stdin.close()
proc.wait()
assert proc.returncode == 0
def _export_pubkey(self):
if not os.path.exists(self.pubkeypath):
keydata = check_output(["gpg", "--export", "--armor", "aptmaster@localhost"], env=self._env)
with open(self.pubkeypath, "wb") as f:
f.write(keydata)
@property
def pubkeypath(self):
return os.path.join(self.dir, "public", "repo.key")
@property
def _env(self):
"""
Return env vars to be used for subprocesses of this module
"""
print(os.environ["PATH"])
return {"GNUPGHOME": self._gpg_dir,
"PATH": os.environ["PATH"]}
@property
def _gpg_dir(self):
return os.path.normpath(os.path.join(self.dir, "gpg"))
def get_path(self, pkgobj, fname):
assert fname in pkgobj.data["files"]
return os.path.join(self.dir, "public", "pool", "main", pkgobj.name[0], pkgobj.name, fname)
providers = {"pypi": PyPiProvider,
"apt": AptProvider}