461 lines
14 KiB

import os
import sqlite3
import requests
import gzip
import argparse
import tempfile
from contextlib import closing
from email import message_from_string
from dataclasses import dataclass
def message_to_string(message):
return message.as_string()[0:-2]
def dict_factory(c, row):
d = {}
for idx, col in enumerate(c.description):
d[col[0]] = row[idx]
return d
def get_db(db_file):
db = sqlite3.connect(db_file)
db.row_factory = dict_factory
queries = [
# packages is the pool of all deb packages
'name' TEXT,
'version' TEXT,
'arch' TEXT,
'fname' TEXT,
'sha256' TEXT,
'has_file' BOOLEAN DEFAULT(0),
'metadata' TEXT, -- from ubuntu/dists/focal/main/binary-amd64/Packages.gz"
UNIQUE(name, version, arch),
# repo_package is a mapping of package -> dist,component,arch
"""CREATE TABLE IF NOT EXISTS 'repo_package' (
'dist' TEXT,
'component' TEXT,
'arch' TEXT,
'name' TEXT,
'version' TEXT,
UNIQUE(dist, component, arch, name, version)
'dist' TEXT,
'metadata' TEXT,
with closing(db.cursor()) as c:
for query in queries:
return db
def parse_dist_release(data):
body = message_from_string(data)
files = []
for line in body["SHA256"].split("\n"):
if not line:
continue # 1st line is blank
hash_, size, path = line.split()
return files
def fetch_packages_file(url):
try the .gz extension first
then the plain file
the raise an error because we don't support .xz
resp = requests.get(url + ".gz")
if resp.status_code == 200:
return gzip.decompress(resp.content).decode()
if resp.status_code != 404:
resp = requests.get(url)
#TODO support the .xz Packages.xz format
return resp.text
class Repoline:
Repoline represents one line in an apt sources.list file
def __init__(self, *, base_url, arch, dist, components):
self.base_url = base_url
self.arch = arch
self.dist = dist
self.components = components
def get_packages(self):
packages = {}
# get the Release file
# lol we don't actually use it
dist_path = "{}dists/{}/".format(self.base_url, self.dist)
r = requests.get("{}Release".format(dist_path))
release = message_from_string(r.text)
ignore_keys = ('MD5Sum', 'SHA1', 'SHA256', 'Acquire-By-Hash', )
for key in ignore_keys:
del release[key]
# parse out each component's Packages/.gz/.xz file
# files = parse_dist_release(release)
for component in self.components:
# main/binary-amd64/Packages.gz
component_prefix = "{}/binary-{}/Packages".format(component, self.arch)
# disabled because will list the plain "Packages" file even tho only the .gz or
# .xz is available (lol)
# find the packages file as it could be one of multiple extensions
# packages_file = None
# for fname in files:
# if fname.startswith(component_prefix):
# print("check", fname)
# packages_file = fname
# break
# if not packages_file:
# raise Exception("couldn't find packages file for component: {}".format(component))
# packages_data = fetch_packages_file("{}{}".format(dist_path, packages_file))
# fetch the packages file
packages[component] = [
for p in fetch_packages_file("{}{}".format(dist_path, component_prefix)).split("\n\n")[0:-1]
return release, packages
def parse(line) -> "Repoline":
Parse 'deb [arch=xxx] focal main restricted'
line = line.split()
# discard the 'deb' prefix
if line[0] != "deb":
raise Exception("expected deb line to start with 'deb' but got '{}'".format(line[0]))
#TODO parse or require arch
# discard '[arch=xxx]'
if line[0].startswith("["):
#TODO assume amd64 for now
arch = "amd64"
# now we have the base url
base_url = line.pop(0)
if not base_url.endswith("/"):
base_url = base_url + "/"
# and the dist
dist = line.pop(0)
return Repoline(base_url=base_url, arch=arch, dist=dist, components=line)
class Repo(object):
def __init__(self, path):
self.db_path = path
self.db = get_db(os.path.join(path, "packages.db"))
self.dists = {}
def cursor(self):
return self.db.cursor()
def get_dist(self, name):
if dist := self.dists.get(name):
return dist
dist = Dist(self, name)
self.dists[name] = dist
return dist
def import_source_metadata(self, line):
release, packages = line.get_packages()
dist = self.get_dist(line.dist)
dirty = False
with closing(self.db.cursor()) as c:
dist.update_metadata(c, release)
for component_name in line.components:
component = dist.get_component(component_name)
arch = component.get_arch(line.arch)
for package in packages[component_name]:
dirty = arch.add_package(c, package) or dirty
# if dirty:
def import_source_packages(self, line):
#TODO parallelize downloads
with closing(self.db.cursor()) as c:
c.execute("SELECT * FROM packages WHERE has_file=0;")
to_download = c.fetchall()
for row in to_download:
metadata = message_from_string(row["metadata"])
print("downloading", metadata["Package"], "@", metadata["Version"])
url=line.base_url + metadata["Filename"])
c.execute("UPDATE packages SET has_file=1 WHERE name=? AND version=? AND arch=?;",
(metadata["Package"], metadata["Version"], metadata["Architecture"], ))
def add_file(self, filename, sha256, fpath=None, url=None):
# acquire the file and move it into the repo's sha path
if (fpath and url) or (not fpath and not url):
raise Exception("must specify fpath or url but not both")
if fpath:
raise Exception("fpath not supported yet, use url")
local_dir = os.path.join(self.db_path, "files", sha256[0])
local_path = os.path.join(local_dir, filename)
if os.path.exists(local_path): # skip files we already have
with tempfile.TemporaryDirectory() as tmp:
ftmp = os.path.join(tmp, "ftmp")
with open(ftmp, "wb") as f:
resp = requests.get(url, stream=True)
for chunk in resp.iter_content(chunk_size=256 * 1024):
os.makedirs(local_dir, exist_ok=True)
os.rename(ftmp, local_path)
class Dist(object):
def __init__(self, repo, name):
self.repo = repo = name
self.components = {}
def get_component(self, name):
if component := self.components.get(name):
return component
component = Component(self, name)
self.components[name] = component
return component
def update_metadata(self, c, release):
c.execute("REPLACE INTO dist_meta (dist, metadata) VALUES (?, ?);",
(, message_to_string(release), ))
class Component(object):
def __init__(self, dist, name):
self.dist = dist = name
self.arches = {}
def get_arch(self, name):
if arch := self.arches.get(name):
return arch
arch = Arch(self, name)
self.arches[name] = arch
return arch
class Arch(object):
def __init__(self, component, name):
self.component = component = name
def add_package(self, c, metadata):
# insert the package into the pool
# return true if we need the file
c.execute("SELECT * FROM packages WHERE name=? AND version=? AND arch=?;",
(metadata["Package"], metadata["Version"], metadata["Architecture"], ))
row = c.fetchone()
if not row:
c.execute("INSERT INTO packages (name, version, arch, fname, sha256, metadata) VALUES (?, ?, ?, ?, ?, ?);",
metadata.as_string()[0:-2], ))
# insert the package into the dist
c.execute("REPLACE INTO repo_package (dist, component, arch, name, version) VALUES (?, ?, ?, ?, ?);",
metadata["Version"], ))
if row:
return False
return True
def cmd_mirror(args, parser):
Create a repo
- containing all the packages from the db
- containing a subset of packages based on some query
- containing a subset of packages matching an existing repo
repo = Repo(args.database)
# filter the packages
package_query = """SELECT * FROM repo_package;"""
package_query_params = ()
from collections import defaultdict
# (dist,component) -> list((name,arch,version))
packages = defaultdict(list)
with closing(repo.db.cursor()) as c:
c.execute(package_query, package_query_params)
for row in c:
packages[(row["dist"], row["component"], )].append(
(row["arch"], row["name"], row["version"], ))
packages = dict(packages)
# build the metadata files
we need to build a structure like:
* source is a field in the package's metadata
TODO add it to the db
Release (hashes of everything in ./<component/)
InRelease (the above, but as a gpg signed message)
Release.gpg (pgp signature for Release file)
(optional? Some kind of index of package contents)
we'll need to skip it for now anyways as we aren't importing it
Release (very small identifier file)
Packages (the metadata of all the packages we'll include)
for each component:
for each package:
link the package file into place, if needed
for each dist:
for each component:
for each arch:
generate Packages file
generate the Release / InRelease metadata file
import pdb
# sign the files
# put the packages in place
def cmd_import(args, parser):
if not args.line:
print("--file not yet supported")
line = Repoline.parse(args.line)
r = Repo(args.database)
# phase 1, get metadata
if not args.debs:
# phase 2, get the .deb files
if not args.meta:
def cmd_shell(args, parser):
repo = Repo(args.database)
import pdb
def main():
parser = argparse.ArgumentParser(description="apt repo mirroring tool")
parser.add_argument("--database", required=True, help="package database path")
sp_action = parser.add_subparsers(dest="action", help="action to take")
p_shell = sp_action.add_parser("shell", help="interactive shell")
p_mirror = sp_action.add_parser("mirror", help="deploy a repo")
p_ingest = sp_action.add_parser("ingest", help="import packages from existing repos")
ingest_source = p_ingest.add_mutually_exclusive_group(required=True)
ingest_source.add_argument("--line", help="import packages from a single apt sources.list source")
ingest_source.add_argument("--file", help="import packages all sources in the given sources.list file")
ingest_method = p_ingest.add_mutually_exclusive_group()
ingest_method.add_argument("--meta", action="store_true", help="only import metadata")
ingest_method.add_argument("--debs", action="store_true", help="only download packages")
args = parser.parse_args()
args.func(args, parser)
# r = Repo("./testrepo/")
# r = Repo("./testef/")
# focal = r.get_dist("focal")
# focal_main = focal.get_component("main")
# focal_main_x64 = focal_main.get_arch("binary-amd64")
# focal_main_x64.add_package("x", "y")
# r.deploy(path="./www/")
# r.import_source('deb focal main restricted')
# r.import_source('deb focal main')
# import pdb
# pdb.set_trace()
# pass