add support for parsing and validating debian source files
This commit is contained in:
parent
e739492714
commit
0a9d1aeb74
130
README.md
130
README.md
|
@ -1,7 +1,6 @@
|
|||
[![Build Status](https://travis-ci.org/TheClimateCorporation/python-dpkg.svg?branch=master)](https://travis-ci.org/TheClimateCorporation/python-dpkg)
|
||||
|
||||
python-dpkg
|
||||
===========
|
||||
# python-dpkg
|
||||
|
||||
This library can be used to:
|
||||
|
||||
|
@ -12,6 +11,10 @@ This library can be used to:
|
|||
the algorithm described at
|
||||
https://www.debian.org/doc/debian-policy/ch-controlfields.html#s-f-Version
|
||||
|
||||
3. Parse debian source description (dsc) files, inspect their contents
|
||||
and verify that their source files are present and checksums are
|
||||
correct.
|
||||
|
||||
This is primarily intended for use on platforms that do not normally
|
||||
ship [python-apt](http://apt.alioth.debian.org/python-apt-doc/) due to
|
||||
licensing restrictions or the lack of a native libapt.so (e.g. macOS)
|
||||
|
@ -20,8 +23,7 @@ Currently only tested on CPython 2.7 and 3.5, but at least in theory should run
|
|||
on any python distribution that can install the [arpy](https://pypi.python.org/pypi/arpy/)
|
||||
library.
|
||||
|
||||
Installing
|
||||
==========
|
||||
## Installing
|
||||
|
||||
Install the 'pydpkg' package from [PyPi](https://pypi.python.org) using
|
||||
the [pip](https://packaging.python.org/installing/) tool:
|
||||
|
@ -32,11 +34,11 @@ the [pip](https://packaging.python.org/installing/) tool:
|
|||
Installing collected packages: pydpkg
|
||||
Successfully installed pydpkg-1.1
|
||||
|
||||
Usage
|
||||
=====
|
||||
## Usage
|
||||
|
||||
Read and extract headers
|
||||
------------------------
|
||||
### Binary Packages
|
||||
|
||||
#### Read and extract headers
|
||||
|
||||
>>> from pydpkg import Dpkg
|
||||
>>> dp = Dpkg('/tmp/testdeb_1:0.0.0-test_all.deb')
|
||||
|
@ -55,16 +57,14 @@ Read and extract headers
|
|||
Description: testdeb
|
||||
a bogus debian package for testing dpkg builds
|
||||
|
||||
Interact directly with the package control message
|
||||
--------------------------------------------------
|
||||
#### Interact directly with the package control message
|
||||
|
||||
>>> dp.message
|
||||
<email.message.Message instance at 0x10895c6c8>
|
||||
>>> dp.message.get_content_type()
|
||||
'text/plain'
|
||||
|
||||
Get package file fingerprints
|
||||
-----------------------------
|
||||
#### Get package file fingerprints
|
||||
|
||||
>>> dp.fileinfo
|
||||
{'sha256': '547500652257bac6f6bc83f0667d0d66c8abd1382c776c4de84b89d0f550ab7f', 'sha1': 'a5d28ae2f23e726a797349d7dd5f21baf8aa02b4', 'filesize': 910, 'md5': '149e61536a9fe36374732ec95cf7945d'}
|
||||
|
@ -77,8 +77,7 @@ Get package file fingerprints
|
|||
>>> dp.filesize
|
||||
910
|
||||
|
||||
Get the components of the package version
|
||||
-----------------------------------------
|
||||
#### Get the components of the package version
|
||||
|
||||
>>> d.epoch
|
||||
1
|
||||
|
@ -87,8 +86,7 @@ Get the components of the package version
|
|||
>>> d.debian_revision
|
||||
u'test'
|
||||
|
||||
Get an arbitrary control header, case-independent
|
||||
-------------------------------------------------
|
||||
#### Get an arbitrary control header, case-independent
|
||||
|
||||
>>> d.version
|
||||
u'1:0.0.0-test'
|
||||
|
@ -102,8 +100,7 @@ Get an arbitrary control header, case-independent
|
|||
>>> d.get('nosuchheader', 'default')
|
||||
'default'
|
||||
|
||||
Compare current version to a candidate version
|
||||
----------------------------------------------
|
||||
#### Compare current version to a candidate version
|
||||
|
||||
>>> dp.compare_version_with('1.0')
|
||||
1
|
||||
|
@ -111,8 +108,7 @@ Compare current version to a candidate version
|
|||
>>> dp.compare_version_with('1:1.0')
|
||||
-1
|
||||
|
||||
Compare two arbitrary version strings
|
||||
-------------------------------------
|
||||
#### Compare two arbitrary version strings
|
||||
|
||||
>>> from pydpkg import Dpkg
|
||||
>>> ver_1 = '0:1.0-test1'
|
||||
|
@ -120,15 +116,13 @@ Compare two arbitrary version strings
|
|||
>>> Dpkg.compare_versions(ver_1, ver_2)
|
||||
-1
|
||||
|
||||
Use as a key function to sort a list of version strings
|
||||
-------------------------------------------------------
|
||||
#### Use as a key function to sort a list of version strings
|
||||
|
||||
>>> from pydpkg import Dpkg
|
||||
>>> sorted(['0:1.0-test1', '1:0.0-test0', '0:1.0-test2'] , key=Dpkg.compare_versions_key)
|
||||
['0:1.0-test1', '0:1.0-test2', '1:0.0-test0']
|
||||
|
||||
Use the `dpkg-inspect.py` script to inspect packages
|
||||
----------------------------------------------------
|
||||
#### Use the `dpkg-inspect.py` script to inspect packages
|
||||
|
||||
$ dpkg-inspect.py ~/testdeb*deb
|
||||
Filename: /Home/n/testdeb_1:0.0.0-test_all.deb
|
||||
|
@ -147,3 +141,91 @@ Use the `dpkg-inspect.py` script to inspect packages
|
|||
Description: testdeb
|
||||
a bogus debian package for testing dpkg builds
|
||||
|
||||
### Source Packages
|
||||
|
||||
#### Read and extract headers
|
||||
|
||||
>>> from pydpkg import Dsc
|
||||
>>> dsc = Dsc('testdeb_0.0.0.dsc')
|
||||
>>> dsc.headers
|
||||
{'Uploaders': 'Nathan J. Mehl <n@climate.com>', 'Binary': 'testdeb', 'Maintainer': 'Nathan J. Mehl <n@climate.com>', 'Format': '3.0 (quilt)', 'Build-Depends': 'python (>= 2.6.6-3), debhelper (>= 9)', 'Source': 'testdeb', 'Version': '0.0.0-1', 'Standards-Version': '3.9.6', 'Architecture': 'all', 'Files': ' 142ca7334ed1f70302b4504566e0c233 280 testdeb_0.0.0.orig.tar.gz\n fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_0.0.0-1.debian.tar.xz', 'Checksums-Sha1': ' f250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_0.0.0.orig.tar.gz\n cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_0.0.0-1.debian.tar.xz', 'Package-List': 'testdeb', 'Homepage': 'https://github.com/TheClimateCorporation', 'Checksums-Sha256': ' aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_0.0.0.orig.tar.gz\n 1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_0.0.0-1.debian.tar.xz'}
|
||||
|
||||
#### Interact directly with the dsc message
|
||||
|
||||
>>> dsc.message
|
||||
<email.message.Message instance at 0x106fedea8>
|
||||
>>> dsc.message.get_content_type()
|
||||
'text/plain'
|
||||
>>> dsc.message.get('uploaders')
|
||||
'Nathan J. Mehl <n@climate.com>'
|
||||
|
||||
#### Render the dsc message as a string
|
||||
|
||||
>>> print(dsc.message_str)
|
||||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 0.0.0-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List: testdeb
|
||||
Checksums-Sha1: f250ac0a426b31df24fc2c98050f4fab90e456cd 280
|
||||
testdeb_0.0.0.orig.tar.gz
|
||||
cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Checksums-Sha256: aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280
|
||||
testdeb_0.0.0.orig.tar.gz
|
||||
1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232
|
||||
testdeb_0.0.0-1.debian.tar.xz
|
||||
Files: 142ca7334ed1f70302b4504566e0c233 280 testdeb_0.0.0.orig.tar.gz
|
||||
fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
|
||||
#### List the package source files from the dsc
|
||||
|
||||
>>> dsc.files
|
||||
['/home/n/testdeb_0.0.0.orig.tar.gz', 'home/n/testdeb_0.0.0-1.debian.tar.xz']
|
||||
|
||||
#### Validate that the package source files are present
|
||||
|
||||
>>> dsc.missing_files
|
||||
[]
|
||||
>>> dsc.all_files_present
|
||||
True
|
||||
>>> dsc.validate()
|
||||
>>>
|
||||
|
||||
>>> bad = Dsc('testdeb_1.1.1-bad.dsc')
|
||||
>>> bad.missing_files
|
||||
['/home/n/testdeb_1.1.1.orig.tar.gz', '/home/n/testdeb_1.1.1-1.debian.tar.xz']
|
||||
>>> bad.all_files_present
|
||||
False
|
||||
>>> bad.validate()
|
||||
pydpkg.DscMissingFileError: ['/home/n/testdeb_1.1.1.orig.tar.gz', '/home/n/testdeb_1.1.1-1.debian.tar.xz']
|
||||
|
||||
#### Inspect the source file checksums from the dsc
|
||||
|
||||
>>> pp(dsc.checksums)
|
||||
{'sha1': {'/home/n/testdeb_0.0.0-1.debian.tar.xz': 'cb3474ff94053018957ebcf1d8a2b45f75dda449',
|
||||
'/home/n/testdeb_0.0.0.orig.tar.gz': 'f250ac0a426b31df24fc2c98050f4fab90e456cd'},
|
||||
'sha256': {'/home/n/testdeb_0.0.0-1.debian.tar.xz': '1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858',
|
||||
'/home/n/testdeb_0.0.0.orig.tar.gz': 'aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7'}}
|
||||
|
||||
#### Validate that all source file checksums are correct
|
||||
|
||||
>>> dsc.corrected_checksums
|
||||
{}
|
||||
>>> dsc.all_checksums_correct
|
||||
True
|
||||
>>> dsc.validate()
|
||||
>>>
|
||||
|
||||
>>> bad = Dsc('testdeb_0.0.0-badchecksums.dsc')
|
||||
>>> bad.corrected_checksums
|
||||
{'sha256': defaultdict(None, {'/home/n/testdeb_0.0.0-1.debian.tar.xz': '1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858', '/home/n/testdeb_0.0.0.orig.tar.gz': 'aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7'}), 'sha1': defaultdict(None, {'/home/n/testdeb_0.0.0-1.debian.tar.xz': 'cb3474ff94053018957ebcf1d8a2b45f75dda449', '/home/n/testdeb_0.0.0.orig.tar.gz': 'f250ac0a426b31df24fc2c98050f4fab90e456cd'})}
|
||||
>>> bad.all_checksums_correct
|
||||
False
|
||||
>>> bad.validate()
|
||||
pydpkg.DscBadChecksumsError: {'sha256': defaultdict(None, {'/home/n/testdeb_0.0.0-1.debian.tar.xz': '1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858', '/home/n/testdeb_0.0.0.orig.tar.gz': 'aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7'}), 'sha1': defaultdict(None, {'/home/n/testdeb_0.0.0-1.debian.tar.xz': 'cb3474ff94053018957ebcf1d8a2b45f75dda449', '/home/n/testdeb_0.0.0.orig.tar.gz': 'f250ac0a426b31df24fc2c98050f4fab90e456cd'})}
|
||||
|
|
|
@ -6,17 +6,20 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
# stdlib imports
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import tarfile
|
||||
|
||||
from collections import defaultdict
|
||||
from gzip import GzipFile
|
||||
from hashlib import md5, sha1, sha256
|
||||
from email import message_from_string as Message
|
||||
from email import message_from_string, message_from_file
|
||||
from functools import cmp_to_key
|
||||
|
||||
# pypi imports
|
||||
import six
|
||||
import pgpy
|
||||
from arpy import Archive
|
||||
|
||||
REQUIRED_HEADERS = ('package', 'version', 'architecture')
|
||||
|
@ -25,11 +28,16 @@ logging.basicConfig()
|
|||
|
||||
|
||||
class DpkgError(Exception):
|
||||
"""Base error class for pydpkg"""
|
||||
"""Base error class for Dpkg errors"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgVersionError(Exception):
|
||||
class DscError(Exception):
|
||||
"""Base error class for Dsc errors"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgVersionError(DpkgError):
|
||||
"""Corrupt or unparseable version string"""
|
||||
pass
|
||||
|
||||
|
@ -49,6 +57,21 @@ class DpkgMissingRequiredHeaderError(DpkgError):
|
|||
pass
|
||||
|
||||
|
||||
class DscMissingFileError(DscError):
|
||||
"""We were not able to find some of the files listed in the dsc"""
|
||||
pass
|
||||
|
||||
|
||||
class DscBadChecksumsError(DscError):
|
||||
"""Some of the files in the dsc have incorrect checksums"""
|
||||
pass
|
||||
|
||||
|
||||
class DscBadSignatureError(DscError):
|
||||
"""A dsc file has an invalid openpgp signature(s)"""
|
||||
pass
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
class Dpkg(object):
|
||||
|
||||
|
@ -152,9 +175,9 @@ class Dpkg(object):
|
|||
:returns: dict
|
||||
"""
|
||||
if self._fileinfo is None:
|
||||
h_md5 = md5()
|
||||
h_sha1 = sha1()
|
||||
h_sha256 = sha256()
|
||||
h_md5 = hashlib.md5()
|
||||
h_sha1 = hashlib.sha1()
|
||||
h_sha256 = hashlib.sha256()
|
||||
with open(self.filename, 'rb') as dpkg_file:
|
||||
for chunk in iter(lambda: dpkg_file.read(128), b''):
|
||||
h_md5.update(chunk)
|
||||
|
@ -296,7 +319,7 @@ class Dpkg(object):
|
|||
# py27 lacks email.message_from_bytes, so...
|
||||
if isinstance(message_body, bytes):
|
||||
message_body = message_body.decode('utf-8')
|
||||
message = Message(message_body)
|
||||
message = message_from_string(message_body)
|
||||
self._log.debug('got control message: %s', message)
|
||||
|
||||
for req in REQUIRED_HEADERS:
|
||||
|
@ -535,3 +558,245 @@ class Dpkg(object):
|
|||
function to a function suitable to passing to sorted() and friends
|
||||
as a key."""
|
||||
return cmp_to_key(Dpkg.dstringcmp)(x)
|
||||
|
||||
|
||||
class Dsc(object):
|
||||
"""Class allowing import and manipulation of a debian source
|
||||
description (dsc) file."""
|
||||
def __init__(self, filename=None, logger=None):
|
||||
self.filename = os.path.expanduser(filename)
|
||||
self._dirname = os.path.dirname(self.filename)
|
||||
self._log = logger or logging.getLogger(__name__)
|
||||
self._message = None
|
||||
self._files = None
|
||||
self._sizes = None
|
||||
self._message_str = None
|
||||
self._checksums = None
|
||||
self._corrected_checksums = None
|
||||
self._pgp_message = None
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.message_str)
|
||||
|
||||
def __str__(self):
|
||||
return six.text_type(self.message_str)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
"""Overload getattr to treat message headers as object
|
||||
attributes (so long as they do not conflict with an existing
|
||||
attribute).
|
||||
|
||||
:param attr: string
|
||||
:returns: string
|
||||
:raises: AttributeError
|
||||
"""
|
||||
# handle attributes with dashes :-(
|
||||
munged = attr.replace('_', '-')
|
||||
# beware: email.Message[nonexistent] returns None not KeyError
|
||||
if munged in self.message:
|
||||
return self.message[munged]
|
||||
else:
|
||||
raise AttributeError("'Dsc' object has no attribute '%s'" % attr)
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Overload getitem to treat the message plus our local
|
||||
properties as items.
|
||||
|
||||
:param item: string
|
||||
:returns: string
|
||||
:raises: KeyError
|
||||
"""
|
||||
try:
|
||||
return getattr(self, item)
|
||||
except AttributeError:
|
||||
try:
|
||||
return self.__getattr__(item)
|
||||
except AttributeError:
|
||||
raise KeyError(item)
|
||||
|
||||
def get(self, item, ret=None):
|
||||
"""Public wrapper for getitem"""
|
||||
try:
|
||||
return self.__getitem__(item)
|
||||
except KeyError:
|
||||
return ret
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
"""Return an email.Message object containing the parsed dsc file"""
|
||||
if self._message is None:
|
||||
self._message = self._process_dsc_file()
|
||||
return self._message
|
||||
|
||||
@property
|
||||
def headers(self):
|
||||
"""Return a dictionary of the message items"""
|
||||
if self._message is None:
|
||||
self._message = self._process_dsc_file()
|
||||
return dict(self._message.items())
|
||||
|
||||
@property
|
||||
def pgp_message(self):
|
||||
"""Return a pgpy.PGPMessage object containing the signed dsc
|
||||
message (or None if the message is unsigned)"""
|
||||
if self._message is None:
|
||||
self._message = self._process_dsc_file()
|
||||
return self._pgp_message
|
||||
|
||||
@property
|
||||
def files(self):
|
||||
"""Return a list of source files found in the dsc file"""
|
||||
if self._files is None:
|
||||
self._files = self._process_source_files()
|
||||
return [x[0] for x in self._files]
|
||||
|
||||
@property
|
||||
def all_files_present(self):
|
||||
"""Return true if all files listed in the dsc have been found"""
|
||||
if self._files is None:
|
||||
self._files = self._process_source_files()
|
||||
return all([x[2] for x in self._files])
|
||||
|
||||
@property
|
||||
def all_checksums_correct(self):
|
||||
"""Return true if all checksums are correct"""
|
||||
return not self.corrected_checksums
|
||||
|
||||
@property
|
||||
def corrected_checksums(self):
|
||||
"""Returns a dict of the CORRECT checksums in any case
|
||||
where the ones provided by the dsc file are incorrect."""
|
||||
if self._corrected_checksums is None:
|
||||
self._corrected_checksums = self._validate_checksums()
|
||||
return self._corrected_checksums
|
||||
|
||||
@property
|
||||
def missing_files(self):
|
||||
"""Return a list of all files from the dsc that we failed to find"""
|
||||
if self._files is None:
|
||||
self._files = self._process_source_files()
|
||||
return [x[0] for x in self._files if x[2] is False]
|
||||
|
||||
@property
|
||||
def sizes(self):
|
||||
"""Return a list of source files found in the dsc file"""
|
||||
if self._files is None:
|
||||
self._files = self._process_source_files()
|
||||
return dict([(x[0], x[1]) for x in self._files])
|
||||
|
||||
@property
|
||||
def message_str(self):
|
||||
"""Return the dsc message as a string
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
if self._message_str is None:
|
||||
self._message_str = self.message.as_string()
|
||||
return self._message_str
|
||||
|
||||
@property
|
||||
def checksums(self):
|
||||
"""Return a dictionary of checksums for the source files found
|
||||
in the dsc file, keyed first by hash type and then by filename."""
|
||||
if self._checksums is None:
|
||||
self._checksums = self._process_checksums()
|
||||
return self._checksums
|
||||
|
||||
def validate(self):
|
||||
"""Raise exceptions if files are missing or checksums are bad."""
|
||||
if not self.all_files_present:
|
||||
raise DscMissingFileError(
|
||||
[x[0] for x in self._files if not x[2]])
|
||||
if not self.all_checksums_correct:
|
||||
raise DscBadChecksumsError(self.corrected_checksums)
|
||||
|
||||
def _process_checksums(self):
|
||||
"""Walk through the dsc message looking for any keys in the
|
||||
format 'Checksum-hashtype'. Return a nested dictionary in
|
||||
the form {hashtype: {filename: {digest}}}"""
|
||||
sums = {}
|
||||
for key in self.message.keys():
|
||||
if key.lower().startswith('checksums'):
|
||||
hashtype = key.split('-')[1].lower()
|
||||
sums[hashtype] = {}
|
||||
source = self.message[key]
|
||||
for line in source.split('\n'):
|
||||
if line:
|
||||
digest, _, filename = line.strip().split(' ')
|
||||
pathname = os.path.abspath(
|
||||
os.path.join(self._dirname, filename))
|
||||
sums[hashtype][pathname] = digest
|
||||
return sums
|
||||
|
||||
def _process_dsc_file(self):
|
||||
"""Extract the dsc message from a file: parse the dsc body
|
||||
and return an email.Message object. Attempt to extract the
|
||||
RFC822 message from an OpenPGP message if necessary."""
|
||||
if not self.filename.endswith('.dsc'):
|
||||
self._log.warning(
|
||||
'File %s does not appear to be a dsc file; pressing '
|
||||
'on but we may experience some turbulence and possibly '
|
||||
'explode.', self.filename)
|
||||
try:
|
||||
self._pgp_message = pgpy.PGPMessage.from_file(self.filename)
|
||||
msg = message_from_string(self._pgp_message.message)
|
||||
except TypeError as ex:
|
||||
self._log.exception(ex)
|
||||
self._log.fatal(
|
||||
'dsc file %s has a corrupt signature: %s', self.filename, ex)
|
||||
raise DscBadSignatureError
|
||||
# '%s has a corrupt signature' % self.filename)
|
||||
except IOError as ex:
|
||||
self._log.fatal('Could not read dsc file "%s": %s',
|
||||
self.filename, ex)
|
||||
raise
|
||||
except (ValueError, pgpy.errors.PGPError) as ex:
|
||||
self._log.warning('dsc file %s is not signed: %s',
|
||||
self.filename, ex)
|
||||
with open(self.filename) as fileobj:
|
||||
msg = message_from_file(fileobj)
|
||||
return msg
|
||||
|
||||
def _process_source_files(self):
|
||||
"""Walk through the list of lines in the 'Files' section of
|
||||
the dsc message, and verify that the file exists in the same
|
||||
location on our filesystem as the dsc file. Return a list
|
||||
of tuples: the normalized pathname for the file, the
|
||||
size of the file (as claimed by the dsc) and whether the file
|
||||
is actually present in the filesystem locally.
|
||||
|
||||
Also extract the file size from the message lines and fill
|
||||
out the _files dictionary.
|
||||
"""
|
||||
filenames = []
|
||||
try:
|
||||
files = self.message['Files']
|
||||
except KeyError:
|
||||
self._log.fatal('DSC file "%s" does not have a Files section',
|
||||
self.filename)
|
||||
raise
|
||||
for line in files.split('\n'):
|
||||
if line:
|
||||
_, size, filename = line.strip().split(' ')
|
||||
pathname = os.path.abspath(
|
||||
os.path.join(self._dirname, filename))
|
||||
filenames.append(
|
||||
(pathname, int(size), os.path.isfile(pathname)))
|
||||
return filenames
|
||||
|
||||
def _validate_checksums(self):
|
||||
"""Iterate over the dict of asserted checksums from the
|
||||
dsc file. Check each in turn. If any checksum is invalid,
|
||||
append the correct checksum to a similarly structured dict
|
||||
and return them all at the end."""
|
||||
bad_hashes = defaultdict(lambda: defaultdict(None))
|
||||
for hashtype, filenames in six.iteritems(self.checksums):
|
||||
for filename, digest in six.iteritems(filenames):
|
||||
hasher = getattr(hashlib, hashtype)()
|
||||
with open(filename, 'rb') as fileobj:
|
||||
# pylint: disable=cell-var-from-loop
|
||||
for chunk in iter(lambda: fileobj.read(128), b''):
|
||||
hasher.update(chunk)
|
||||
if hasher.hexdigest() != digest:
|
||||
bad_hashes[hashtype][filename] = hasher.hexdigest()
|
||||
return dict(bad_hashes)
|
||||
|
|
5
setup.py
5
setup.py
|
@ -1,6 +1,6 @@
|
|||
from distutils.core import setup
|
||||
|
||||
__VERSION__ = '1.2.1'
|
||||
__VERSION__ = '1.3.0'
|
||||
|
||||
setup(
|
||||
name='pydpkg',
|
||||
|
@ -14,7 +14,8 @@ setup(
|
|||
keywords=['apt', 'debian', 'dpkg', 'packaging'],
|
||||
install_requires=[
|
||||
'arpy==1.1.1',
|
||||
'six==1.10.0'
|
||||
'six==1.10.0',
|
||||
'PGPy==0.4.1'
|
||||
],
|
||||
extras_require={
|
||||
'test': ['pep8==1.7.0', 'pytest==3.1.1', 'pylint==1.7.1']
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import unittest
|
||||
import pytest
|
||||
from email.message import Message
|
||||
|
||||
from pydpkg import Dsc, DscMissingFileError, DscBadSignatureError, DscBadChecksumsError
|
||||
from pgpy import PGPMessage
|
||||
|
||||
|
||||
TEST_DSC_FILE = 'testdeb_0.0.0.dsc'
|
||||
TEST_SIGNED_DSC_FILE = 'testdeb_0.0.0.dsc.asc'
|
||||
TEST_BAD_DSC_FILE = 'testdeb_1.1.1-bad.dsc'
|
||||
TEST_BAD_SIGNED_FILE = 'testdeb_1.1.1-bad.dsc.asc'
|
||||
TEST_BAD_CHECKSUMS_FILE = 'testdeb_0.0.0-badchecksums.dsc'
|
||||
|
||||
|
||||
class DscTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
goodfile = os.path.join(os.path.dirname(__file__), TEST_DSC_FILE)
|
||||
signed = os.path.join(os.path.dirname(__file__), TEST_SIGNED_DSC_FILE)
|
||||
badfile = os.path.join(os.path.dirname(__file__), TEST_BAD_DSC_FILE)
|
||||
badsigned = os.path.join(os.path.dirname(__file__), TEST_BAD_SIGNED_FILE)
|
||||
badchecksums = os.path.join(os.path.dirname(__file__), TEST_BAD_CHECKSUMS_FILE)
|
||||
self.good = Dsc(goodfile)
|
||||
self.signed = Dsc(signed)
|
||||
self.bad = Dsc(badfile)
|
||||
self.badsigned = Dsc(badsigned)
|
||||
self.badchecksums = Dsc(badchecksums)
|
||||
|
||||
def test_get_message_headers(self):
|
||||
self.assertEqual(self.good.source, 'testdeb')
|
||||
self.assertEqual(self.good.SOURCE, 'testdeb')
|
||||
self.assertEqual(self.good['source'], 'testdeb')
|
||||
self.assertEqual(self.good['SOURCE'], 'testdeb')
|
||||
self.assertEqual(self.good.get('source'), 'testdeb')
|
||||
self.assertEqual(self.good.get('SOURCE'), 'testdeb')
|
||||
self.assertEqual(self.good.get('nonexistent'), None)
|
||||
self.assertEqual(self.good.get('nonexistent', 'foo'), 'foo')
|
||||
|
||||
def test_attr_munging(self):
|
||||
self.assertEqual(self.good['package-list'], 'testdeb')
|
||||
self.assertEqual(self.good.package_list, 'testdeb')
|
||||
|
||||
def test_missing_header(self):
|
||||
self.assertRaises(KeyError, self.good.__getitem__, 'xyzzy')
|
||||
self.assertRaises(AttributeError, self.good.__getattr__, 'xyzzy')
|
||||
|
||||
def test_message(self):
|
||||
self.assertIsInstance(self.good.message, type(Message()))
|
||||
|
||||
def test_found_files(self):
|
||||
self.assertEqual(
|
||||
self.good.files,
|
||||
[os.path.join(os.path.dirname(__file__),
|
||||
'testdeb_0.0.0.orig.tar.gz'),
|
||||
os.path.join(os.path.dirname(__file__),
|
||||
'testdeb_0.0.0-1.debian.tar.xz')]
|
||||
)
|
||||
|
||||
def test_missing_files(self):
|
||||
self.assertEqual(True, self.good.all_files_present)
|
||||
self.assertEqual(False, self.bad.all_files_present)
|
||||
self.assertEqual(
|
||||
[os.path.join(os.path.dirname(__file__),
|
||||
'testdeb_1.1.1.orig.tar.gz'),
|
||||
os.path.join(os.path.dirname(__file__),
|
||||
'testdeb_1.1.1-1.debian.tar.xz')],
|
||||
self.bad.missing_files)
|
||||
with pytest.raises(DscMissingFileError):
|
||||
self.bad.validate()
|
||||
|
||||
def test_pgp_validation(self):
|
||||
self.assertEqual(None, self.good.pgp_message)
|
||||
self.assertEqual(self.signed.source, 'testdeb')
|
||||
with pytest.raises(DscBadSignatureError):
|
||||
self.badsigned.files
|
||||
self.assertIsInstance(self.signed.pgp_message, PGPMessage)
|
||||
|
||||
def test_checksum_validation(self):
|
||||
self.assertEqual(True, self.good.all_checksums_correct)
|
||||
self.assertEqual(False, self.badchecksums.all_checksums_correct)
|
||||
with pytest.raises(DscBadChecksumsError):
|
||||
self.badchecksums.validate()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(DscTest)
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
Binary file not shown.
|
@ -0,0 +1,21 @@
|
|||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 0.0.0-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List: testdeb
|
||||
Checksums-Sha1:
|
||||
250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_0.0.0.orig.tar.gz
|
||||
b3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Checksums-Sha256:
|
||||
a57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_0.0.0.orig.tar.gz
|
||||
ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Files:
|
||||
42ca7334ed1f70302b4504566e0c233 280 testdeb_0.0.0.orig.tar.gz
|
||||
c80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 0.0.0-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List: testdeb
|
||||
Checksums-Sha1:
|
||||
f250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_0.0.0.orig.tar.gz
|
||||
cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Checksums-Sha256:
|
||||
aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_0.0.0.orig.tar.gz
|
||||
1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Files:
|
||||
142ca7334ed1f70302b4504566e0c233 280 testdeb_0.0.0.orig.tar.gz
|
||||
fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
-----BEGIN PGP SIGNED MESSAGE-----
|
||||
Hash: SHA256
|
||||
|
||||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 0.0.0-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List: testdeb
|
||||
Checksums-Sha1:
|
||||
f250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_0.0.0.orig.tar.gz
|
||||
cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Checksums-Sha256:
|
||||
aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_0.0.0.orig.tar.gz
|
||||
1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
Files:
|
||||
142ca7334ed1f70302b4504566e0c233 280 testdeb_0.0.0.orig.tar.gz
|
||||
fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_0.0.0-1.debian.tar.xz
|
||||
|
||||
-----BEGIN PGP SIGNATURE-----
|
||||
|
||||
iQEcBAEBCAAGBQJZSyK8AAoJEDBtfbmvfY+UTc4H/RMC3nwCM4JF227bxR2+uxTI
|
||||
UwYncODHRqrhwp31ntQNMEtahSBipY75ockymLSsZEkNYhDSE5WcLv5gDybpAt5P
|
||||
18BA7sn+K3Y99L7eNhX4Tsn4Gs+ip0Z1QdHhBHURMbGQfayqWgATV0z7Xg9Uo2GA
|
||||
hgolnXtpNNchl1RMjwR9e2urqQZhzbiwGhsigBnj8F+v4Gl7etr6BmOricIKp61w
|
||||
rBGNzXrGPR80xSdNr5iFwyRpKLvobAkJqbVxSlCLbRf5nXH1H96p7Q5EyaVEecdF
|
||||
X2sK95GJlwGveQV4hoe0Lq5c97q2qKo7ZaXIL8FRQkG545fS/HsictgetPkyIsw=
|
||||
=goqi
|
||||
-----END PGP SIGNATURE-----
|
Binary file not shown.
|
@ -0,0 +1,22 @@
|
|||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 1.1.1-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List:
|
||||
testdeb deb admin optional arch=all
|
||||
Checksums-Sha1:
|
||||
f250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_1.1.1.orig.tar.gz
|
||||
cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
Checksums-Sha256:
|
||||
aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_1.1.1.orig.tar.gz
|
||||
1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
Files:
|
||||
142ca7334ed1f70302b4504566e0c233 280 testdeb_1.1.1.orig.tar.gz
|
||||
fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
-----BEGIN PGP SIGNED MESSAGE-----
|
||||
Hash: SHA256
|
||||
|
||||
Format: 3.0 (quilt)
|
||||
Source: testdeb
|
||||
Binary: testdeb
|
||||
Architecture: all
|
||||
Version: 1.1.1-1
|
||||
Maintainer: Nathan J. Mehl <n@climate.com>
|
||||
Uploaders: Nathan J. Mehl <n@climate.com>
|
||||
Homepage: https://github.com/TheClimateCorporation
|
||||
Standards-Version: 3.9.6
|
||||
Build-Depends: python (>= 2.6.6-3), debhelper (>= 9)
|
||||
Package-List:
|
||||
testdeb deb admin optional arch=all
|
||||
Checksums-Sha1:
|
||||
f250ac0a426b31df24fc2c98050f4fab90e456cd 280 testdeb_1.1.1.orig.tar.gz
|
||||
cb3474ff94053018957ebcf1d8a2b45f75dda449 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
Checksums-Sha256:
|
||||
aa57ba8f29840383f5a96c5c8f166a9e6da7a484151938643ce2618e82bfeea7 280 testdeb_1.1.1.orig.tar.gz
|
||||
1ddb2a7336a99bc1d203f3ddb59f6fa2d298e90cb3e59cccbe0c84e359979858 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
Files:
|
||||
142ca7334ed1f70302b4504566e0c233 280 testdeb_1.1.1.orig.tar.gz
|
||||
fc80e6e7f1c1a08b78a674aaee6c1548 232 testdeb_1.1.1-1.debian.tar.xz
|
||||
-----BEGIN PGP SIGNATURE-----
|
||||
|
||||
iQEcBAEBCAAGBQJZSyL/AAoJEDBtfbmvfY+UH6EIALzdWgPraB/8xrzSum/g600+
|
||||
lPGrObE1vgtH0IdkWyBhLwZKZGSgeEI7U11EOC+z78ULTTDafjeXZDWTPhO+F23d
|
||||
VFSOTHnzhqTtoqa1lT4qL2A6kWCc3QxpFtI6dLuZtXFlEEKk9w2tu/GzSWtuzVKQ
|
||||
qNRDdLsiFz7kPwMWjvlaqVjz5wqtY3j5TpYpVH8uqtrCuzaycGjzTgMLxHa+o2T+
|
||||
AHYnZLRN22hLb1DeFaZDkNiC6qgx45FV98jh0sYTZA7A15MbwCvNLD0uws4ICwPl
|
||||
8ex3e0rf4FlpUuMNJTQmShuxaMGlbwtzKFy75SxQxxJ0r+7TL4/B37vBE9+BJqs=
|
||||
=Of1F
|
||||
-----END PGP SIGNATURE-----
|
Loading…
Reference in New Issue