pydebmirror/pydebmirror/cli2.py

461 lines
14 KiB
Python

import os
import sqlite3
import requests
import gzip
import argparse
import tempfile
from contextlib import closing
from email import message_from_string
from dataclasses import dataclass
def message_to_string(message):
return message.as_string()[0:-2]
def dict_factory(c, row):
d = {}
for idx, col in enumerate(c.description):
d[col[0]] = row[idx]
return d
def get_db(db_file):
db = sqlite3.connect(db_file)
db.row_factory = dict_factory
queries = [
# packages is the pool of all deb packages
"""CREATE TABLE IF NOT EXISTS 'packages' (
'name' TEXT,
'version' TEXT,
'arch' TEXT,
'fname' TEXT,
'sha256' TEXT,
'has_file' BOOLEAN DEFAULT(0),
'metadata' TEXT, -- from ubuntu/dists/focal/main/binary-amd64/Packages.gz"
UNIQUE(name, version, arch),
UNIQUE(fname),
UNIQUE(sha256)
)""",
# repo_package is a mapping of package -> dist,component,arch
"""CREATE TABLE IF NOT EXISTS 'repo_package' (
'dist' TEXT,
'component' TEXT,
'arch' TEXT,
'name' TEXT,
'version' TEXT,
UNIQUE(dist, component, arch, name, version)
"""CREATE TABLE IF NOT EXISTS 'dist_meta' (
'dist' TEXT,
'metadata' TEXT,
UNIQUE(dist)
)""",
]
with closing(db.cursor()) as c:
for query in queries:
c.execute(query)
return db
def parse_dist_release(data):
body = message_from_string(data)
files = []
for line in body["SHA256"].split("\n"):
if not line:
continue # 1st line is blank
hash_, size, path = line.split()
files.append(path)
return files
def fetch_packages_file(url):
"""
http://archive.ubuntu.com/ubuntu/dists/focal/main/binary-amd64/Packages
try the .gz extension first
then the plain file
the raise an error because we don't support .xz
"""
resp = requests.get(url + ".gz")
if resp.status_code == 200:
return gzip.decompress(resp.content).decode()
if resp.status_code != 404:
resp.raise_for_status()
resp = requests.get(url)
resp.raise_for_status()
#TODO support the .xz Packages.xz format
return resp.text
class Repoline:
"""
Repoline represents one line in an apt sources.list file
"""
def __init__(self, *, base_url, arch, dist, components):
self.base_url = base_url
self.arch = arch
self.dist = dist
self.components = components
def get_packages(self):
packages = {}
# get the Release file
# lol we don't actually use it
dist_path = "{}dists/{}/".format(self.base_url, self.dist)
r = requests.get("{}Release".format(dist_path))
r.raise_for_status()
release = message_from_string(r.text)
ignore_keys = ('MD5Sum', 'SHA1', 'SHA256', 'Acquire-By-Hash', )
for key in ignore_keys:
del release[key]
# parse out each component's Packages/.gz/.xz file
# files = parse_dist_release(release)
for component in self.components:
# main/binary-amd64/Packages.gz
component_prefix = "{}/binary-{}/Packages".format(component, self.arch)
# disabled because http://archive.ubuntu.com/ will list the plain "Packages" file even tho only the .gz or
# .xz is available (lol)
# find the packages file as it could be one of multiple extensions
# packages_file = None
# for fname in files:
# if fname.startswith(component_prefix):
# print("check", fname)
# packages_file = fname
# break
# if not packages_file:
# raise Exception("couldn't find packages file for component: {}".format(component))
# packages_data = fetch_packages_file("{}{}".format(dist_path, packages_file))
# fetch the packages file
packages[component] = [
message_from_string(p)
for p in fetch_packages_file("{}{}".format(dist_path, component_prefix)).split("\n\n")[0:-1]
]
return release, packages
@staticmethod
def parse(line) -> "Repoline":
"""
Parse 'deb [arch=xxx] http://archive.ubuntu.com/ubuntu/ focal main restricted'
"""
line = line.split()
# discard the 'deb' prefix
if line[0] != "deb":
raise Exception("expected deb line to start with 'deb' but got '{}'".format(line[0]))
line.pop(0)
#TODO parse or require arch
# discard '[arch=xxx]'
if line[0].startswith("["):
line.pop(0)
#TODO assume amd64 for now
arch = "amd64"
# now we have the base url
base_url = line.pop(0)
if not base_url.endswith("/"):
base_url = base_url + "/"
# and the dist
dist = line.pop(0)
return Repoline(base_url=base_url, arch=arch, dist=dist, components=line)
class Repo(object):
def __init__(self, path):
self.db_path = path
self.db = get_db(os.path.join(path, "packages.db"))
self.dists = {}
def cursor(self):
return self.db.cursor()
def get_dist(self, name):
if dist := self.dists.get(name):
return dist
dist = Dist(self, name)
self.dists[name] = dist
return dist
def import_source_metadata(self, line):
release, packages = line.get_packages()
dist = self.get_dist(line.dist)
dirty = False
with closing(self.db.cursor()) as c:
dist.update_metadata(c, release)
for component_name in line.components:
component = dist.get_component(component_name)
arch = component.get_arch(line.arch)
for package in packages[component_name]:
dirty = arch.add_package(c, package) or dirty
# if dirty:
c.execute("COMMIT")
def import_source_packages(self, line):
#TODO parallelize downloads
with closing(self.db.cursor()) as c:
c.execute("SELECT * FROM packages WHERE has_file=0;")
to_download = c.fetchall()
for row in to_download:
metadata = message_from_string(row["metadata"])
print("downloading", metadata["Package"], "@", metadata["Version"])
self.add_file(os.path.basename(metadata["Filename"]),
metadata["sha256"],
url=line.base_url + metadata["Filename"])
c.execute("UPDATE packages SET has_file=1 WHERE name=? AND version=? AND arch=?;",
(metadata["Package"], metadata["Version"], metadata["Architecture"], ))
c.execute("COMMIT")
def add_file(self, filename, sha256, fpath=None, url=None):
# acquire the file and move it into the repo's sha path
if (fpath and url) or (not fpath and not url):
raise Exception("must specify fpath or url but not both")
if fpath:
raise Exception("fpath not supported yet, use url")
local_dir = os.path.join(self.db_path, "files", sha256[0])
local_path = os.path.join(local_dir, filename)
if os.path.exists(local_path): # skip files we already have
return
with tempfile.TemporaryDirectory() as tmp:
ftmp = os.path.join(tmp, "ftmp")
with open(ftmp, "wb") as f:
resp = requests.get(url, stream=True)
resp.raise_for_status()
for chunk in resp.iter_content(chunk_size=256 * 1024):
f.write(chunk)
os.makedirs(local_dir, exist_ok=True)
os.rename(ftmp, local_path)
class Dist(object):
def __init__(self, repo, name):
self.repo = repo
self.name = name
self.components = {}
def get_component(self, name):
if component := self.components.get(name):
return component
component = Component(self, name)
self.components[name] = component
return component
def update_metadata(self, c, release):
c.execute("REPLACE INTO dist_meta (dist, metadata) VALUES (?, ?);",
(self.name, message_to_string(release), ))
class Component(object):
def __init__(self, dist, name):
self.dist = dist
self.name = name
self.arches = {}
def get_arch(self, name):
if arch := self.arches.get(name):
return arch
arch = Arch(self, name)
self.arches[name] = arch
return arch
class Arch(object):
def __init__(self, component, name):
self.component = component
self.name = name
def add_package(self, c, metadata):
# insert the package into the pool
# return true if we need the file
c.execute("SELECT * FROM packages WHERE name=? AND version=? AND arch=?;",
(metadata["Package"], metadata["Version"], metadata["Architecture"], ))
row = c.fetchone()
if not row:
c.execute("INSERT INTO packages (name, version, arch, fname, sha256, metadata) VALUES (?, ?, ?, ?, ?, ?);",
(metadata["Package"],
metadata["Version"],
metadata["Architecture"],
os.path.basename(metadata["Filename"]),
metadata["SHA256"],
metadata.as_string()[0:-2], ))
# insert the package into the dist
c.execute("REPLACE INTO repo_package (dist, component, arch, name, version) VALUES (?, ?, ?, ?, ?);",
(self.component.dist.name,
self.component.name,
self.name,
metadata["Package"],
metadata["Version"], ))
if row:
return False
return True
def cmd_mirror(args, parser):
"""
Create a repo
- containing all the packages from the db
- containing a subset of packages based on some query
- containing a subset of packages matching an existing repo
"""
repo = Repo(args.database)
# filter the packages
#TODO
package_query = """SELECT * FROM repo_package;"""
package_query_params = ()
from collections import defaultdict
# (dist,component) -> list((name,arch,version))
packages = defaultdict(list)
with closing(repo.db.cursor()) as c:
c.execute(package_query, package_query_params)
for row in c:
packages[(row["dist"], row["component"], )].append(
(row["arch"], row["name"], row["version"], ))
packages = dict(packages)
# build the metadata files
"""
we need to build a structure like:
/pool/<component>/<letter>/<source>/whatever.deb
* source is a field in the package's metadata
TODO add it to the db
and
/dists/<dist>/
Release (hashes of everything in ./<component/)
InRelease (the above, but as a gpg signed message)
Release.gpg (pgp signature for Release file)
Contents-<arch>.gz
(optional? Some kind of index of package contents)
we'll need to skip it for now anyways as we aren't importing it
/dists/<dist>/<component>/<arch>/
Release (very small identifier file)
Packages (the metadata of all the packages we'll include)
procedure:
for each component:
for each package:
link the package file into place, if needed
for each dist:
for each component:
for each arch:
generate Packages file
generate the Release / InRelease metadata file
"""
import pdb
pdb.set_trace()
# sign the files
# put the packages in place
pass
def cmd_import(args, parser):
if not args.line:
print("--file not yet supported")
return
line = Repoline.parse(args.line)
r = Repo(args.database)
# phase 1, get metadata
if not args.debs:
r.import_source_metadata(line)
# phase 2, get the .deb files
if not args.meta:
r.import_source_packages(line)
def cmd_shell(args, parser):
repo = Repo(args.database)
import pdb
pdb.set_trace()
pass
def main():
parser = argparse.ArgumentParser(description="apt repo mirroring tool")
parser.add_argument("--database", required=True, help="package database path")
sp_action = parser.add_subparsers(dest="action", help="action to take")
p_shell = sp_action.add_parser("shell", help="interactive shell")
p_shell.set_defaults(func=cmd_shell)
p_mirror = sp_action.add_parser("mirror", help="deploy a repo")
p_mirror.set_defaults(func=cmd_mirror)
p_ingest = sp_action.add_parser("ingest", help="import packages from existing repos")
p_ingest.set_defaults(func=cmd_import)
ingest_source = p_ingest.add_mutually_exclusive_group(required=True)
ingest_source.add_argument("--line", help="import packages from a single apt sources.list source")
ingest_source.add_argument("--file", help="import packages all sources in the given sources.list file")
ingest_method = p_ingest.add_mutually_exclusive_group()
ingest_method.add_argument("--meta", action="store_true", help="only import metadata")
ingest_method.add_argument("--debs", action="store_true", help="only download packages")
args = parser.parse_args()
args.func(args, parser)
# r = Repo("./testrepo/")
# r = Repo("./testef/")
# focal = r.get_dist("focal")
# focal_main = focal.get_component("main")
# focal_main_x64 = focal_main.get_arch("binary-amd64")
# focal_main_x64.add_package("x", "y")
# r.deploy(path="./www/")
# r.import_source('deb http://archive.ubuntu.com/ubuntu/ focal main restricted')
# r.import_source('deb http://artifact.scc.net.davepedu.com/repo/apt/extpython/ focal main')
# import pdb
# pdb.set_trace()
# pass