import os import sqlite3 import requests import gzip import argparse from contextlib import closing from email import message_from_string from dataclasses import dataclass def dict_factory(c, row): d = {} for idx, col in enumerate(c.description): d[col[0]] = row[idx] return d def get_db(db_path): db_file = os.path.join(db_path, "packages.db") db = sqlite3.connect(db_file) db.row_factory = dict_factory queries = [ """CREATE TABLE IF NOT EXISTS 'packages' ( 'name' TEXT, 'version' TEXT, 'arch' TEXT, 'fname' TEXT, 'sha256' TEXT, 'has_file' BOOLEAN DEFAULT(0), 'metadata' TEXT, -- from ubuntu/dists/focal/main/binary-amd64/Packages.gz" UNIQUE(name, version, arch), UNIQUE(fname), UNIQUE(sha256) )""", ] with closing(db.cursor()) as c: for query in queries: c.execute(query) return db def request_packages(url): """ the "Packages" metadata file may be plain, or with the .gz or .xz extension. This method requests each until the correct path is found """ gzip.decompress(request_packages(url).content).decode().split("\n\n")[0:-1] return requests.get(url) @dataclass class Repoline: """ Repoline represents one line in an apt sources.list file """ base_url: str arch: str dist: str components: list[str] @property def packages_urls(self): """ URL to the 'Packages.gz' metadata file for each component e.g. http://archive.ubuntu.com/ubuntu/dists/focal/main/binary-amd64/Packages.gz """ urls = {} for component in self.components: urls[component] = "{}dists/{}/{}/binary-{}/Packages" \ .format(self.base_url, self.dist, component, self.arch) return urls def fetch_packages_meta(self, component): url = self.packages_urls[component] data = request_packages(url).split("\n\n")[0:-1] return [message_from_string(p) for p in data] @staticmethod def parse(line) -> "Repoline": """ Parse 'deb [arch=xxx] http://archive.ubuntu.com/ubuntu/ focal main restricted' """ line = line.split() # discard the 'deb' prefix if line[0] != "deb": raise Exception("expected deb line to start with 'deb' but got '{}'".format(line[0])) line.pop(0) #TODO parse or require arch # discard '[arch=xxx]' if line[0].startswith("["): line.pop(0) # assume amd64 for now arch = "amd64" # now we have the base url base_url = line.pop(0) if not base_url.endswith("/"): base_url = base_url + "/" # and the dist dist = line.pop(0) return Repoline(base_url=base_url, arch=arch, dist=dist, components=line) def download_file(url, local_path): print("downloading", url) # print(local_path) # import pdb # pdb.set_trace() # pass with open(local_path, "wb") as f: resp = requests.get(url, stream=True) resp.raise_for_status() for chunk in resp.iter_content(): f.write(chunk) def cmd_ingest(args, parser): if not args.line: print("--file not yet supported") return repo = Repoline.parse(args.line) db = get_db(args.database) if not args.debs: with closing(db.cursor()) as c: new_packages = False for component in repo.components: print("fetching", component) for pkg in repo.fetch_packages_meta(component): c.execute("SELECT count(*) as count FROM packages WHERE name=? AND version=? AND arch=?;", (pkg["Package"], pkg["Version"], pkg["Architecture"], )) if c.fetchone()['count'] > 0: continue new_packages = True c.execute("INSERT INTO packages (name, version, arch, fname, sha256, metadata) VALUES (?, ?, ?, ?, ?, ?);", (pkg["Package"], pkg["Version"], pkg["Architecture"], os.path.basename(pkg["Filename"]), pkg["SHA256"], pkg.as_string()[0:-2], )) if new_packages: c.execute("COMMIT") if not args.meta: #TODO parallelize downloads with closing(db.cursor()) as c: c.execute("SELECT count(*) as count FROM packages WHERE has_file=0;") print("need to download {} packages".format(c.fetchone()["count"])) c.execute("SELECT * FROM packages WHERE has_file=0;") to_download = c.fetchall() for row in to_download: meta = message_from_string(row["metadata"]) url = repo.base_url + meta["Filename"] local_dir = os.path.join(args.database, "files", row["sha256"][0]) os.makedirs(local_dir, exist_ok=True) local_path = os.path.join(local_dir, os.path.basename(meta["Filename"])) download_file(url, local_path) c.execute("UPDATE packages SET has_file=1 WHERE sha256=?;", (row["sha256"], )) c.execute("COMMIT") def cmd_mirror(args, parser): """ Create a repo - containing all the packages from the db - containing a subset of packages based on some query - containing a subset of packages matching an existing repo """ # filter the packages # build the metadata files # sign the files # put the packages in place pass def main(): parser = argparse.ArgumentParser(description="apt repo mirroring tool") parser.add_argument("--database", required=True, help="package database path") sp_action = parser.add_subparsers(dest="action", help="action to take") p_ingest = sp_action.add_parser("ingest", help="import packages from existing repos") p_ingest.set_defaults(func=cmd_ingest) ingest_source = p_ingest.add_mutually_exclusive_group(required=True) ingest_source.add_argument("--line", help="import packages from a single apt sources.list source") ingest_source.add_argument("--file", help="import packages all sources in the given sources.list file") ingest_method = p_ingest.add_mutually_exclusive_group() ingest_method.add_argument("--meta", action="store_true", help="only import metadata") ingest_method.add_argument("--debs", action="store_true", help="only download packages") args = parser.parse_args() args.func(args, parser) if __name__ == '__main__': main()