diff --git a/README.md b/README.md index 0bf00ee..e9d16b7 100644 --- a/README.md +++ b/README.md @@ -43,6 +43,14 @@ apt-get install -y extpython-python3.6 ``` +Replication +----------- + +Repobot can automatically copy packages to neighbor repobot instances. Pass `-n` with one or more neighbor addresses in +the format of `http://1.2.3.4:8080`. When new packages are submitted, they will be queued for replication. The server +will periodically attempt to submit the package to each neighbor. + + Notes ----- diff --git a/repobot/provider.py b/repobot/provider.py index 14ad1af..acdf7e3 100644 --- a/repobot/provider.py +++ b/repobot/provider.py @@ -5,6 +5,10 @@ from jinja2 import Environment, FileSystemLoader, select_autoescape import cherrypy +class DuplicateException(Exception): + pass + + class PkgProvider(object): def __init__(self, db, repo, datadir): """ @@ -26,6 +30,12 @@ class PkgProvider(object): """ raise NotImplementedError() + def get_path(self, pkgobj, fname): + """ + Get the path to a package file in the repo + """ + raise NotImplementedError() + class PyPiProvider(PkgProvider): def add_package(self, pkgobj, fname, fobj, params): @@ -33,7 +43,7 @@ class PyPiProvider(PkgProvider): pkgobj.data["files"] = plist() if fname in pkgobj.data["files"]: - raise Exception("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version)) + raise DuplicateException("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version)) pkgdir = os.path.join(self.dir, pkgobj.name) os.makedirs(pkgdir, exist_ok=True) @@ -65,6 +75,10 @@ class PyPiProvider(PkgProvider): fpath = os.path.join(self.dir, args[0], args[1]) return cherrypy.lib.static.serve_file(os.path.abspath(fpath), "application/octet-stream") + def get_path(self, pkgobj, fname): + assert fname in pkgobj.data["files"] + return os.path.join(self.dir, pkgobj.name, fname) + from subprocess import check_call, check_output, Popen, PIPE from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -105,8 +119,7 @@ class AptProvider(PkgProvider): if "files" not in pkgobj.data: pkgobj.data["files"] = plist() if fname in pkgobj.data["files"]: - # raise Exception("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version)) - pass + raise DuplicateException("File {} already in package {}-{}".format(fname, pkgobj.name, pkgobj.version)) with AptlyConfig(self.dir) as conf: if not os.path.exists(os.path.join(self.dir, "db")): @@ -194,6 +207,10 @@ Expire-Date: 0 def _gpg_dir(self): return os.path.join(self.dir, "gpg") + def get_path(self, pkgobj, fname): + assert fname in pkgobj.data["files"] + return os.path.join(self.dir, "public", "pool", "main", pkgobj.name[0], pkgobj.name, fname) + providers = {"pypi": PyPiProvider, "apt": AptProvider} diff --git a/repobot/replication.py b/repobot/replication.py new file mode 100644 index 0000000..f7f69b0 --- /dev/null +++ b/repobot/replication.py @@ -0,0 +1,76 @@ +from urllib.parse import urlparse, urlunsplit, urlencode +from time import sleep +from threading import Thread +from repobot.provider import providers +import logging +import os +from requests import post + + +log = logging.getLogger("replication") + + +class RepoReplicator(object): + def __init__(self, db, data_root, neighbors): + """ + :param neighbors: list of replication neighbor uris like 'http://1.2.3.4:8080' + """ + self.db = db + self.data_root = data_root + self.neighbors = [urlparse(i) for i in neighbors] + self.worker = None + + def start(self): + if not self.neighbors: + return + self.worker = ReplicationWorker(self) + self.worker.start() + + +class ReplicationWorker(Thread): + def __init__(self, master): + super().__init__() + self.daemon = True + self.master = master + + def run(self): + while True: + with self.master.db.db.transaction() as c: + # for item in c.root.sendqueue: + log.info("items in queue: %s", len(c.root.sendqueue)) + if len(c.root.sendqueue) > 0: + item = c.root.sendqueue[0] + if self.replicate(item): + c.root.sendqueue.pop(0) + log.info("Replication successful") + sleep(5) + + def replicate(self, item): + item_type, item = item + if item_type == "package": + return self.replicate_package(item) + + def replicate_package(self, item): + repo, pkg, fname, params = item + datadir = os.path.join(self.master.data_root, repo.provider, repo.name) + provider = providers[repo.provider](self.master.db, repo, datadir) + fpath = provider.get_path(pkg, fname) + + for neighbor in self.master.neighbors: + q_params = {"provider": repo.provider, + "reponame": repo.name, + "name": pkg.name, + "version": pkg.version} + q_params.update(**params) + url = urlunsplit(["http", neighbor.netloc, "/addpkg", urlencode(q_params), None]) + with open(fpath, 'rb') as fitem: + try: + r = post(url, files={'f': (fname, fitem)}, timeout=(10, 30)) + if r.status_code not in (200, 409): + r.raise_for_status() + except Exception as e: + log.warning("Failed replication of %s to %s: %s", pkg, neighbor.geturl(), str(e)) + return False + log.info("Replicated %s to %s", pkg, neighbor.geturl()) + + return True diff --git a/repobot/repos.py b/repobot/repos.py index 5d132fd..7694a32 100644 --- a/repobot/repos.py +++ b/repobot/repos.py @@ -28,6 +28,9 @@ class RepoPackage(persistent.Persistent): self.version = version self.data = pmap() + def __str__(self): + return "".format(self.name, self.version) + class RepoDb(object): def __init__(self, db_path, data_root): @@ -38,13 +41,19 @@ class RepoDb(object): with self.db.transaction() as c: if "repos" not in c.root(): c.root.repos = BTrees.OOBTree.BTree() + if "sendqueue" not in c.root(): + c.root.sendqueue = plist() def add_package(self, provider, reponame, pkgname, pkgversion, fname, fobj, params): with self.db.transaction() as c: repo = self._get_repo(c, provider, reponame) datadir = os.path.join(self.data_root, provider, reponame) provider = providers[repo.provider](self.db, repo, datadir) - provider.add_package(repo.get_package(pkgname, pkgversion), fname, fobj, params) + # Add the package + pkg = repo.get_package(pkgname, pkgversion) + provider.add_package(pkg, fname, fobj, params) + # Pack successfully added, queue the file for replication + c.root.sendqueue.append(("package", (repo, pkg, fname, params, ))) def _get_repo(self, c, provider, name): if provider not in c.root.repos: diff --git a/repobot/server.py b/repobot/server.py index fe57a9c..95323fb 100644 --- a/repobot/server.py +++ b/repobot/server.py @@ -1,6 +1,8 @@ import cherrypy import logging from repobot.repos import RepoDb +from repobot.provider import DuplicateException +from repobot.replication import RepoReplicator class AppWeb(object): @@ -9,12 +11,27 @@ class AppWeb(object): @cherrypy.expose def addpkg(self, provider, reponame, name, version, f, **params): - self.db.add_package(provider, reponame, name, version, f.filename, f.file, params) + try: + self.db.add_package(provider, reponame, name, version, f.filename, f.file, params) + except DuplicateException: + raise cherrypy.HTTPError(409, 'Package already exists') @cherrypy.expose def repo(self, provider, repo, *args): return self.db.browse_repo(provider, repo, args) + @cherrypy.expose + def index(self): + yield "
"
+        with self.db.db.transaction() as c:
+            for provider, repos in c.root.repos.items():
+                for reponame, repo in repos.items():
+                    print(repo)
+                    for pkgname, versions in repo.packages.items():
+                        for version, pkg in versions.items():
+                            for fname in pkg.data["files"]:
+                                yield "{}/{}/{}/{}/{}\n".format(provider, reponame, pkgname, version, fname)
+
 
 class FlatDispatch(cherrypy.dispatch.Dispatcher):
     def __init__(self, method):
@@ -38,6 +55,7 @@ def main():
     parser.add_argument('-p', '--port', default=8080, type=int, help="tcp port to listen on")
     parser.add_argument('-s', '--database', default="./repos.db", help="path to persistent database")
     parser.add_argument('-d', '--data-root', default="./data/", help="data storage dir")
+    parser.add_argument('-n', '--neighbors', nargs="+", default=[], help="Replication neighbor uris")
     parser.add_argument('--debug', action="store_true", help="enable development options")
 
     args = parser.parse_args()
@@ -46,6 +64,9 @@ def main():
                         format="%(asctime)-15s %(levelname)-8s %(filename)s:%(lineno)d %(message)s")
 
     db = RepoDb(args.database, args.data_root)
+    repl = RepoReplicator(db, args.data_root, args.neighbors)
+
+    repl.start()
 
     web = AppWeb(db)
 
diff --git a/requirements.txt b/requirements.txt
index 94d3249..00dd9fa 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,6 @@
 backports.functools-lru-cache==1.5
 BTrees==4.5.1
+certifi==2018.10.15
 chardet==3.0.4
 cheroot==6.5.2
 CherryPy==18.0.1
@@ -8,6 +9,7 @@ deb-pkg-tools==4.5
 executor==21.2
 fasteners==0.14.1
 humanfriendly==4.16.1
+idna==2.7
 jaraco.functools==1.20
 Jinja2==2.10
 MarkupSafe==1.0
@@ -19,9 +21,11 @@ property-manager==2.3.1
 python-debian==0.1.33
 python-memcached==1.59
 pytz==2018.5
+requests==2.20.0
 six==1.11.0
 tempora==1.13
 transaction==2.2.1
+urllib3==1.24
 verboselogs==1.7
 zc.lockfile==1.3.0
 ZConfig==3.3.0