import os import sqlite3 import requests import gzip import argparse import tempfile from contextlib import closing from email import message_from_string from dataclasses import dataclass def dict_factory(c, row): d = {} for idx, col in enumerate(c.description): d[col[0]] = row[idx] return d def get_db(db_file): db = sqlite3.connect(db_file) db.row_factory = dict_factory queries = [ # packages is the pool of all deb packages """CREATE TABLE IF NOT EXISTS 'packages' ( 'name' TEXT, 'version' TEXT, 'arch' TEXT, 'fname' TEXT, 'sha256' TEXT, 'has_file' BOOLEAN DEFAULT(0), 'metadata' TEXT, -- from ubuntu/dists/focal/main/binary-amd64/Packages.gz" UNIQUE(name, version, arch), UNIQUE(fname), UNIQUE(sha256) )""", # repo_package is a mapping of package -> dist,component,arch """CREATE TABLE IF NOT EXISTS 'repo_package' ( 'dist' TEXT, 'component' TEXT, 'arch' TEXT, 'name' TEXT, 'version' TEXT, UNIQUE(dist, component, arch, name, version) )""", ] with closing(db.cursor()) as c: for query in queries: c.execute(query) return db def parse_dist_release(data): body = message_from_string(data) files = [] for line in body["SHA256"].split("\n"): if not line: continue # 1st line is blank hash_, size, path = line.split() files.append(path) return files def fetch_packages_file(url): """ http://archive.ubuntu.com/ubuntu/dists/focal/main/binary-amd64/Packages try the .gz extension first then the plain file the raise an error because we don't support .xz """ resp = requests.get(url + ".gz") if resp.status_code == 200: return gzip.decompress(resp.content).decode() if resp.status_code != 404: resp.raise_for_status() resp = requests.get(url) resp.raise_for_status() #TODO support the .xz Packages.xz format return resp.text class Repoline: """ Repoline represents one line in an apt sources.list file """ def __init__(self, *, base_url, arch, dist, components): self.base_url = base_url self.arch = arch self.dist = dist self.components = components def get_packages(self): packages = {} # get the Release file # lol we don't actually use it dist_path = "{}dists/{}/".format(self.base_url, self.dist) r = requests.get("{}Release".format(dist_path)) r.raise_for_status() # release = r.text # parse out each component's Packages/.gz/.xz file # files = parse_dist_release(release) for component in self.components: # main/binary-amd64/Packages.gz component_prefix = "{}/binary-{}/Packages".format(component, self.arch) # disabled because http://archive.ubuntu.com/ will list the plain "Packages" file even tho only the .gz or # .xz is available (lol) # find the packages file as it could be one of multiple extensions # packages_file = None # for fname in files: # if fname.startswith(component_prefix): # print("check", fname) # packages_file = fname # break # if not packages_file: # raise Exception("couldn't find packages file for component: {}".format(component)) # packages_data = fetch_packages_file("{}{}".format(dist_path, packages_file)) # fetch the packages file packages[component] = [ message_from_string(p) for p in fetch_packages_file("{}{}".format(dist_path, component_prefix)).split("\n\n")[0:-1] ] return packages @staticmethod def parse(line) -> "Repoline": """ Parse 'deb [arch=xxx] http://archive.ubuntu.com/ubuntu/ focal main restricted' """ line = line.split() # discard the 'deb' prefix if line[0] != "deb": raise Exception("expected deb line to start with 'deb' but got '{}'".format(line[0])) line.pop(0) #TODO parse or require arch # discard '[arch=xxx]' if line[0].startswith("["): line.pop(0) # assume amd64 for now arch = "amd64" # now we have the base url base_url = line.pop(0) if not base_url.endswith("/"): base_url = base_url + "/" # and the dist dist = line.pop(0) return Repoline(base_url=base_url, arch=arch, dist=dist, components=line) class Repo(object): def __init__(self, path): self.db_path = path self.db = get_db(os.path.join(path, "packages.db")) self.dists = {} def cursor(self): return self.db.cursor() def get_dist(self, name): if dist := self.dists.get(name): return dist dist = Dist(self, name) self.dists[name] = dist return dist def import_source_metadata(self, line): packages = line.get_packages() dist = self.get_dist(line.dist) dirty = False with closing(self.db.cursor()) as c: for component_name in line.components: component = dist.get_component(component_name) arch = component.get_arch(line.arch) for package in packages[component_name]: dirty = arch.add_package(c, package) or dirty if dirty: c.execute("COMMIT") def import_source_packages(self, line): #TODO parallelize downloads with closing(self.db.cursor()) as c: c.execute("SELECT * FROM packages WHERE has_file=0;") to_download = c.fetchall() for row in to_download: metadata = message_from_string(row["metadata"]) print("downloading", metadata["Package"], "@", metadata["Version"]) self.add_file(os.path.basename(metadata["Filename"]), metadata["sha256"], url=line.base_url + metadata["Filename"]) c.execute("UPDATE packages SET has_file=1 WHERE name=? AND version=? AND arch=?;", (metadata["Package"], metadata["Version"], metadata["Architecture"], )) c.execute("COMMIT") def add_file(self, filename, sha256, fpath=None, url=None): # acquire the file and move it into the repo's sha path if (fpath and url) or (not fpath and not url): raise Exception("must specify fpath or url but not both") if fpath: raise Exception("fpath not supported yet, use url") local_dir = os.path.join(self.db_path, "files", sha256[0]) local_path = os.path.join(local_dir, filename) if os.path.exists(local_path): # skip files we already have return with tempfile.TemporaryDirectory() as tmp: ftmp = os.path.join(tmp, "ftmp") with open(ftmp, "wb") as f: resp = requests.get(url, stream=True) resp.raise_for_status() for chunk in resp.iter_content(chunk_size=256 * 1024): f.write(chunk) os.makedirs(local_dir, exist_ok=True) os.rename(ftmp, local_path) class Dist(object): def __init__(self, repo, name): self.repo = repo self.name = name self.components = {} def get_component(self, name): if component := self.components.get(name): return component component = Component(self, name) self.components[name] = component return component class Component(object): def __init__(self, dist, name): self.dist = dist self.name = name self.arches = {} def get_arch(self, name): if arch := self.arches.get(name): return arch arch = Arch(self, name) self.arches[name] = arch return arch class Arch(object): def __init__(self, component, name): self.component = component self.name = name def add_package(self, c, metadata): # insert the package into the pool # return true if we need the file c.execute("SELECT * FROM packages WHERE name=? AND version=? AND arch=?;", (metadata["Package"], metadata["Version"], metadata["Architecture"], )) row = c.fetchone() if not row: c.execute("INSERT INTO packages (name, version, arch, fname, sha256, metadata) VALUES (?, ?, ?, ?, ?, ?);", (metadata["Package"], metadata["Version"], metadata["Architecture"], os.path.basename(metadata["Filename"]), metadata["SHA256"], metadata.as_string()[0:-2], )) # insert the package into the dist c.execute("REPLACE INTO repo_package (dist, component, arch, name, version) VALUES (?, ?, ?, ?, ?);", (self.component.dist.name, self.component.name, self.name, metadata["Package"], metadata["Version"], )) if row: return False return True def cmd_mirror(args, parser): """ Create a repo - containing all the packages from the db - containing a subset of packages based on some query - containing a subset of packages matching an existing repo """ # filter the packages # build the metadata files # sign the files # put the packages in place pass def cmd_import(args, parser): if not args.line: print("--file not yet supported") return line = Repoline.parse(args.line) r = Repo(args.database) # phase 1, get metadata if not args.debs: r.import_source_metadata(line) # phase 2, get the .deb files if not args.meta: r.import_source_packages(line) def main(): parser = argparse.ArgumentParser(description="apt repo mirroring tool") parser.add_argument("--database", required=True, help="package database path") sp_action = parser.add_subparsers(dest="action", help="action to take") p_ingest = sp_action.add_parser("ingest", help="import packages from existing repos") p_ingest.set_defaults(func=cmd_import) ingest_source = p_ingest.add_mutually_exclusive_group(required=True) ingest_source.add_argument("--line", help="import packages from a single apt sources.list source") ingest_source.add_argument("--file", help="import packages all sources in the given sources.list file") ingest_method = p_ingest.add_mutually_exclusive_group() ingest_method.add_argument("--meta", action="store_true", help="only import metadata") ingest_method.add_argument("--debs", action="store_true", help="only download packages") args = parser.parse_args() args.func(args, parser) # r = Repo("./testrepo/") # r = Repo("./testef/") # focal = r.get_dist("focal") # focal_main = focal.get_component("main") # focal_main_x64 = focal_main.get_arch("binary-amd64") # focal_main_x64.add_package("x", "y") # r.deploy(path="./www/") # r.import_source('deb http://archive.ubuntu.com/ubuntu/ focal main restricted') # r.import_source('deb http://artifact.scc.net.davepedu.com/repo/apt/extpython/ focal main') # import pdb # pdb.set_trace() # pass