treat control message headers as attributes
also add docstrings and some rudimentary test coverage for the file importer
This commit is contained in:
parent
26081fcf3e
commit
3414f1f194
10
README.md
10
README.md
|
@ -77,6 +77,16 @@ Get package file fingerprints
|
|||
>>> dp.filesize
|
||||
910
|
||||
|
||||
Get the components of the package version
|
||||
-----------------------------------------
|
||||
|
||||
>>> d.epoch
|
||||
1
|
||||
>>> d.upstream_version
|
||||
u'0.0.0'
|
||||
>>> d.debian_revision
|
||||
u'test'
|
||||
|
||||
Get an arbitrary control header, case-independent
|
||||
-------------------------------------------------
|
||||
|
||||
|
|
|
@ -24,40 +24,42 @@ logging.basicConfig()
|
|||
|
||||
|
||||
class DpkgError(Exception):
|
||||
|
||||
"""Base error class for pydpkg"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgVersionError(Exception):
|
||||
|
||||
"""Corrupt or unparseable version string"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgMissingControlFile(DpkgError):
|
||||
|
||||
"""No control file found in control.tar.gz"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgMissingControlGzipFile(DpkgError):
|
||||
|
||||
"""No control.tar.gz file found in dpkg file"""
|
||||
pass
|
||||
|
||||
|
||||
class DpkgMissingRequiredHeaderError(DpkgError):
|
||||
|
||||
"""Corrupt package missing a required header"""
|
||||
pass
|
||||
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
class Dpkg(object):
|
||||
|
||||
"""Class allowing import and manipulation of a debian package file."""
|
||||
|
||||
def __init__(self, filename=None, ignore_missing=False, logger=None):
|
||||
""" Constructor for Dpkg object
|
||||
|
||||
:param filename: string
|
||||
:param ignore_missing: bool
|
||||
:param logger: logging.Logger
|
||||
"""
|
||||
self.filename = os.path.expanduser(filename)
|
||||
self.ignore_missing = ignore_missing
|
||||
if not isinstance(self.filename, six.string_types):
|
||||
|
@ -69,6 +71,9 @@ class Dpkg(object):
|
|||
self._control_str = None
|
||||
self._headers = None
|
||||
self._message = None
|
||||
self._upstream_version = None
|
||||
self._debian_revision = None
|
||||
self._epoch = None
|
||||
|
||||
def __repr__(self):
|
||||
return repr(self.control_str)
|
||||
|
@ -76,33 +81,76 @@ class Dpkg(object):
|
|||
def __str__(self):
|
||||
return six.text_type(self.control_str)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
"""Overload getattr to treat control message headers as object
|
||||
attributes (so long as they do not conflict with an existing
|
||||
attribute).
|
||||
|
||||
:param attr: string
|
||||
:returns: string
|
||||
:raises: AttributeError
|
||||
"""
|
||||
# beware: email.Message[nonexistent] returns None not KeyError
|
||||
if attr in self.message:
|
||||
return self.message[attr]
|
||||
else:
|
||||
raise AttributeError("'Dpkg' object has no attribute '%s'" % attr)
|
||||
|
||||
def __getitem__(self, item):
|
||||
"""Overload getitem to treat the control message plus our local
|
||||
properties as items.
|
||||
|
||||
:param item: string
|
||||
:returns: string
|
||||
:raises: KeyError
|
||||
"""
|
||||
try:
|
||||
return getattr(self, item)
|
||||
except AttributeError:
|
||||
try:
|
||||
return self.__getattr__(item)
|
||||
except AttributeError:
|
||||
raise KeyError(item)
|
||||
|
||||
@property
|
||||
def message(self):
|
||||
"""Return an email.Message object containing the package control
|
||||
structure."""
|
||||
if not self._message:
|
||||
structure.
|
||||
|
||||
:returns: email.Message
|
||||
"""
|
||||
if self._message is None:
|
||||
self._message = self._process_dpkg_file(self.filename)
|
||||
return self._message
|
||||
|
||||
@property
|
||||
def control_str(self):
|
||||
"""Return the control message as a string"""
|
||||
if not self._control_str:
|
||||
"""Return the control message as a string
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
if self._control_str is None:
|
||||
self._control_str = self.message.as_string()
|
||||
return self._control_str
|
||||
|
||||
@property
|
||||
def headers(self):
|
||||
"""Return the control message headers as a dict"""
|
||||
if not self._headers:
|
||||
"""Return the control message headers as a dict
|
||||
|
||||
:returns: dict
|
||||
"""
|
||||
if self._headers is None:
|
||||
self._headers = dict(self.message.items())
|
||||
return self._headers
|
||||
|
||||
@property
|
||||
def fileinfo(self):
|
||||
"""Return a dictionary containing md5/sha1/sha256 checksums
|
||||
and the size in bytes of our target file."""
|
||||
if not self._fileinfo:
|
||||
and the size in bytes of our target file.
|
||||
|
||||
:returns: dict
|
||||
"""
|
||||
if self._fileinfo is None:
|
||||
h_md5 = md5()
|
||||
h_sha1 = sha1()
|
||||
h_sha256 = sha256()
|
||||
|
@ -121,26 +169,84 @@ class Dpkg(object):
|
|||
|
||||
@property
|
||||
def md5(self):
|
||||
"""Return the md5 hash of our target file"""
|
||||
"""Return the md5 hash of our target file
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
return self.fileinfo['md5']
|
||||
|
||||
@property
|
||||
def sha1(self):
|
||||
"""Return the sha1 hash of our target file"""
|
||||
"""Return the sha1 hash of our target file
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
return self.fileinfo['sha1']
|
||||
|
||||
@property
|
||||
def sha256(self):
|
||||
"""Return the sha256 hash of our target file"""
|
||||
"""Return the sha256 hash of our target file
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
return self.fileinfo['sha256']
|
||||
|
||||
@property
|
||||
def filesize(self):
|
||||
"""Return the size of our target file"""
|
||||
"""Return the size of our target file
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
return self.fileinfo['filesize']
|
||||
|
||||
@property
|
||||
def epoch(self):
|
||||
"""Return the epoch portion of the package version string
|
||||
|
||||
:returns: int
|
||||
"""
|
||||
if self._epoch is None:
|
||||
self._epoch = self.split_full_version(self.version)[0]
|
||||
return self._epoch
|
||||
|
||||
@property
|
||||
def upstream_version(self):
|
||||
"""Return the upstream portion of the package version string
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
if self._upstream_version is None:
|
||||
self._upstream_version = self.split_full_version(self.version)[1]
|
||||
return self._upstream_version
|
||||
|
||||
@property
|
||||
def debian_revision(self):
|
||||
"""Return the debian revision portion of the package version string
|
||||
|
||||
:returns: string
|
||||
"""
|
||||
if self._debian_revision is None:
|
||||
self._debian_revision = self.split_full_version(self.version)[2]
|
||||
return self._debian_revision
|
||||
|
||||
def get(self, item, default=None):
|
||||
"""Return an object property, a message header, None or the caller-
|
||||
provided default.
|
||||
|
||||
:param item: string
|
||||
:param default:
|
||||
:returns: string
|
||||
"""
|
||||
try:
|
||||
return self.__getitem__(item)
|
||||
except KeyError:
|
||||
return default
|
||||
|
||||
def get_header(self, header):
|
||||
""" case-independent query for a control message header value """
|
||||
"""Return an individual control message header
|
||||
|
||||
:returns: string or None
|
||||
"""
|
||||
return self.message.get(header)
|
||||
|
||||
def compare_version_with(self, version_str):
|
||||
|
@ -194,8 +300,6 @@ class Dpkg(object):
|
|||
|
||||
for req in REQUIRED_HEADERS:
|
||||
if req not in list(map(str.lower, message.keys())):
|
||||
import pdb
|
||||
pdb.set_trace()
|
||||
if self.ignore_missing:
|
||||
self._log.debug(
|
||||
'Header "%s" not found in control message', req)
|
||||
|
|
2
setup.py
2
setup.py
|
@ -1,6 +1,6 @@
|
|||
from distutils.core import setup
|
||||
|
||||
__VERSION__ = '1.1.2'
|
||||
__VERSION__ = '1.2.0'
|
||||
|
||||
setup(
|
||||
name='pydpkg',
|
||||
|
|
|
@ -1,12 +1,45 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import os
|
||||
import unittest
|
||||
from functools import cmp_to_key
|
||||
from email.message import Message
|
||||
|
||||
from pydpkg import Dpkg, DpkgVersionError
|
||||
|
||||
|
||||
TEST_DPKG_FILE = 'testdeb_1:0.0.0-test_all.deb'
|
||||
|
||||
|
||||
class DpkgTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
dpkgfile = os.path.join(os.path.dirname(__file__), TEST_DPKG_FILE)
|
||||
self.dpkg = Dpkg(dpkgfile)
|
||||
|
||||
def test_get_versions(self):
|
||||
self.assertEqual(self.dpkg.epoch, 1)
|
||||
self.assertEqual(self.dpkg.upstream_version, '0.0.0')
|
||||
self.assertEqual(self.dpkg.debian_revision, 'test')
|
||||
|
||||
def test_get_message_headers(self):
|
||||
self.assertEqual(self.dpkg.package, 'testdeb')
|
||||
self.assertEqual(self.dpkg.PACKAGE, 'testdeb')
|
||||
self.assertEqual(self.dpkg['package'], 'testdeb')
|
||||
self.assertEqual(self.dpkg['PACKAGE'], 'testdeb')
|
||||
self.assertEqual(self.dpkg.get('package'), 'testdeb')
|
||||
self.assertEqual(self.dpkg.get('PACKAGE'), 'testdeb')
|
||||
self.assertEqual(self.dpkg.get('nonexistent'), None)
|
||||
self.assertEqual(self.dpkg.get('nonexistent', 'foo'), 'foo')
|
||||
|
||||
def test_missing_header(self):
|
||||
self.assertRaises(KeyError, self.dpkg.__getitem__, 'xyzzy')
|
||||
self.assertRaises(AttributeError, self.dpkg.__getattr__, 'xyzzy')
|
||||
|
||||
def test_message(self):
|
||||
self.assertIsInstance(self.dpkg.message, type(Message()))
|
||||
|
||||
|
||||
class DpkgVersionsTest(unittest.TestCase):
|
||||
|
||||
def test_get_epoch(self):
|
||||
self.assertEqual(Dpkg.get_epoch('0'), (0, '0'))
|
||||
|
|
Binary file not shown.
Loading…
Reference in New Issue