218 lines
6.6 KiB
Python
218 lines
6.6 KiB
Python
import os
|
|
import sqlite3
|
|
import requests
|
|
import gzip
|
|
import argparse
|
|
from contextlib import closing
|
|
from email import message_from_string
|
|
from dataclasses import dataclass
|
|
|
|
|
|
def dict_factory(c, row):
|
|
d = {}
|
|
for idx, col in enumerate(c.description):
|
|
d[col[0]] = row[idx]
|
|
return d
|
|
|
|
|
|
def get_db(db_path):
|
|
db_file = os.path.join(db_path, "packages.db")
|
|
db = sqlite3.connect(db_file)
|
|
db.row_factory = dict_factory
|
|
|
|
queries = [
|
|
"""CREATE TABLE IF NOT EXISTS 'packages' (
|
|
'name' TEXT,
|
|
'version' TEXT,
|
|
'arch' TEXT,
|
|
'fname' TEXT,
|
|
'sha256' TEXT,
|
|
'has_file' BOOLEAN DEFAULT(0),
|
|
'metadata' TEXT, -- from ubuntu/dists/focal/main/binary-amd64/Packages.gz"
|
|
UNIQUE(name, version, arch),
|
|
UNIQUE(fname),
|
|
UNIQUE(sha256)
|
|
)""",
|
|
]
|
|
|
|
with closing(db.cursor()) as c:
|
|
for query in queries:
|
|
c.execute(query)
|
|
|
|
return db
|
|
|
|
|
|
def request_packages(url):
|
|
"""
|
|
the "Packages" metadata file may be plain, or with the .gz or .xz extension. This method requests each until the correct path is found
|
|
"""
|
|
gzip.decompress(request_packages(url).content).decode().split("\n\n")[0:-1]
|
|
return requests.get(url)
|
|
|
|
|
|
@dataclass
|
|
class Repoline:
|
|
"""
|
|
Repoline represents one line in an apt sources.list file
|
|
"""
|
|
base_url: str
|
|
arch: str
|
|
dist: str
|
|
components: list[str]
|
|
|
|
@property
|
|
def packages_urls(self):
|
|
"""
|
|
URL to the 'Packages.gz' metadata file for each component
|
|
e.g. http://archive.ubuntu.com/ubuntu/dists/focal/main/binary-amd64/Packages.gz
|
|
"""
|
|
urls = {}
|
|
for component in self.components:
|
|
urls[component] = "{}dists/{}/{}/binary-{}/Packages" \
|
|
.format(self.base_url, self.dist, component, self.arch)
|
|
return urls
|
|
|
|
def fetch_packages_meta(self, component):
|
|
url = self.packages_urls[component]
|
|
data = request_packages(url).split("\n\n")[0:-1]
|
|
return [message_from_string(p) for p in data]
|
|
|
|
@staticmethod
|
|
def parse(line) -> "Repoline":
|
|
"""
|
|
Parse 'deb [arch=xxx] http://archive.ubuntu.com/ubuntu/ focal main restricted'
|
|
"""
|
|
line = line.split()
|
|
|
|
# discard the 'deb' prefix
|
|
if line[0] != "deb":
|
|
raise Exception("expected deb line to start with 'deb' but got '{}'".format(line[0]))
|
|
line.pop(0)
|
|
|
|
#TODO parse or require arch
|
|
# discard '[arch=xxx]'
|
|
if line[0].startswith("["):
|
|
line.pop(0)
|
|
|
|
# assume amd64 for now
|
|
arch = "amd64"
|
|
|
|
# now we have the base url
|
|
base_url = line.pop(0)
|
|
if not base_url.endswith("/"):
|
|
base_url = base_url + "/"
|
|
|
|
# and the dist
|
|
dist = line.pop(0)
|
|
|
|
return Repoline(base_url=base_url, arch=arch, dist=dist, components=line)
|
|
|
|
|
|
def download_file(url, local_path):
|
|
print("downloading", url)
|
|
# print(local_path)
|
|
# import pdb
|
|
# pdb.set_trace()
|
|
# pass
|
|
|
|
with open(local_path, "wb") as f:
|
|
resp = requests.get(url, stream=True)
|
|
resp.raise_for_status()
|
|
for chunk in resp.iter_content():
|
|
f.write(chunk)
|
|
|
|
|
|
def cmd_ingest(args, parser):
|
|
if not args.line:
|
|
print("--file not yet supported")
|
|
return
|
|
|
|
repo = Repoline.parse(args.line)
|
|
db = get_db(args.database)
|
|
|
|
if not args.debs:
|
|
with closing(db.cursor()) as c:
|
|
new_packages = False
|
|
for component in repo.components:
|
|
print("fetching", component)
|
|
for pkg in repo.fetch_packages_meta(component):
|
|
|
|
c.execute("SELECT count(*) as count FROM packages WHERE name=? AND version=? AND arch=?;",
|
|
(pkg["Package"], pkg["Version"], pkg["Architecture"], ))
|
|
|
|
if c.fetchone()['count'] > 0:
|
|
continue
|
|
|
|
new_packages = True
|
|
c.execute("INSERT INTO packages (name, version, arch, fname, sha256, metadata) VALUES (?, ?, ?, ?, ?, ?);",
|
|
(pkg["Package"], pkg["Version"], pkg["Architecture"], os.path.basename(pkg["Filename"]),
|
|
pkg["SHA256"], pkg.as_string()[0:-2], ))
|
|
|
|
if new_packages:
|
|
c.execute("COMMIT")
|
|
|
|
if not args.meta:
|
|
#TODO parallelize downloads
|
|
with closing(db.cursor()) as c:
|
|
c.execute("SELECT count(*) as count FROM packages WHERE has_file=0;")
|
|
print("need to download {} packages".format(c.fetchone()["count"]))
|
|
c.execute("SELECT * FROM packages WHERE has_file=0;")
|
|
to_download = c.fetchall()
|
|
|
|
for row in to_download:
|
|
meta = message_from_string(row["metadata"])
|
|
url = repo.base_url + meta["Filename"]
|
|
|
|
local_dir = os.path.join(args.database, "files", row["sha256"][0])
|
|
os.makedirs(local_dir, exist_ok=True)
|
|
local_path = os.path.join(local_dir, os.path.basename(meta["Filename"]))
|
|
|
|
download_file(url, local_path)
|
|
|
|
c.execute("UPDATE packages SET has_file=1 WHERE sha256=?;", (row["sha256"], ))
|
|
c.execute("COMMIT")
|
|
|
|
|
|
def cmd_mirror(args, parser):
|
|
"""
|
|
Create a repo
|
|
- containing all the packages from the db
|
|
- containing a subset of packages based on some query
|
|
- containing a subset of packages matching an existing repo
|
|
"""
|
|
|
|
# filter the packages
|
|
|
|
# build the metadata files
|
|
|
|
# sign the files
|
|
|
|
# put the packages in place
|
|
|
|
pass
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="apt repo mirroring tool")
|
|
parser.add_argument("--database", required=True, help="package database path")
|
|
|
|
sp_action = parser.add_subparsers(dest="action", help="action to take")
|
|
p_ingest = sp_action.add_parser("ingest", help="import packages from existing repos")
|
|
p_ingest.set_defaults(func=cmd_ingest)
|
|
|
|
ingest_source = p_ingest.add_mutually_exclusive_group(required=True)
|
|
ingest_source.add_argument("--line", help="import packages from a single apt sources.list source")
|
|
ingest_source.add_argument("--file", help="import packages all sources in the given sources.list file")
|
|
|
|
ingest_method = p_ingest.add_mutually_exclusive_group()
|
|
ingest_method.add_argument("--meta", action="store_true", help="only import metadata")
|
|
ingest_method.add_argument("--debs", action="store_true", help="only download packages")
|
|
|
|
args = parser.parse_args()
|
|
|
|
args.func(args, parser)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|