From 7a2439a08cd736db379490558a0b34e72a68275e Mon Sep 17 00:00:00 2001 From: "Nathan J. Mehl" Date: Tue, 24 Jan 2017 12:00:07 -0800 Subject: [PATCH] initial commit --- LICENSE.txt | 13 ++ README.md | 77 +++++++++++ pydpkg/__init__.py | 320 +++++++++++++++++++++++++++++++++++++++++++++ setup.cfg | 2 + setup.py | 20 +++ tests/test_dpkg.py | 144 ++++++++++++++++++++ 6 files changed, 576 insertions(+) create mode 100644 LICENSE.txt create mode 100644 README.md create mode 100644 pydpkg/__init__.py create mode 100644 setup.cfg create mode 100644 setup.py create mode 100644 tests/test_dpkg.py diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..019d39c --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,13 @@ +Copyright [2017] The Climate Corporation (https://climate.com) + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..99b59d2 --- /dev/null +++ b/README.md @@ -0,0 +1,77 @@ +python-dpkg +=========== + +This library can be used to: + +1. read and extract control data from Debian-format package files, even + on platforms that generally lack a native implementation of dpkg + +2. compare dpkg version strings, using a pure Python implementation of + the algorithm described at + https://www.debian.org/doc/debian-policy/ch-controlfields.html#s-f-Version + +This is primarily intended for use on platforms that do not normally +ship [python-apt](http://apt.alioth.debian.org/python-apt-doc/) due to +licensing restrictions or the lack of a native libapt.so (e.g. macOS) + +Currently only tested on Python 2.6 and 2.7. Should run on any python2 +distribution that can install the [arpy](https://pypi.python.org/pypi/arpy/) +library. + + +Usage +===== + +Read and extract headers +------------------------ + + >>> from pydpkg import Dpkg + >>> dp = Dpkg('/tmp/testdeb_1:0.0.0-test_all.deb') + + >>> dp.headers + {'maintainer': u'Climate Corp Engineering ', 'description': u'testdeb\n a bogus debian package for testing dpkg builds', 'package': u'testdeb', 'section': u'base', 'priority': u'extra', 'installed-size': u'0', 'version': u'1:0.0.0-test', 'architecture': u'all'} + + >>> print dp + Package: testdeb + Version: 1:0.0.0-test + Section: base + Priority: extra + Architecture: all + Installed-Size: 0 + Maintainer: Climate Corp Engineering + Description: testdeb + a bogus debian package for testing dpkg builds + +Get an arbitrary control header, case-independent +------------------------------------------------- + + >>> dp.get_header('version') + u'1:0.0.0-test' + + >>> dp.get_header('VERSION') + u'1:0.0.0-test' + +Compare current version to a candidate version +---------------------------------------------- + + >>> dp.compare_version_with('1.0') + 1 + + >>> dp.compare_version_with('1:1.0') + -1 + +Compare two arbitrary version strings +------------------------------------- + + >>> from pydpkg import Dpkg + >>> ver_1 = '0:1.0-test1' + >>> ver_2 = '0:1.0-test2' + >>> Dpkg.compare_versions(ver_1, ver_2) + -1 + +Use as a cmp function to sort a list of version strings +------------------------------------------------------- + + >>> from pydpkg import Dpkg + >>> sorted(['0:1.0-test1', '1:0.0-test0', '0:1.0-test2'] , cmp=Dpkg.compare_versions) + ['0:1.0-test1', '0:1.0-test2', '1:0.0-test0'] diff --git a/pydpkg/__init__.py b/pydpkg/__init__.py new file mode 100644 index 0000000..0d6411a --- /dev/null +++ b/pydpkg/__init__.py @@ -0,0 +1,320 @@ + +# stdlib imports +import os +import tarfile + +from StringIO import StringIO +from rfc822 import Message +from gzip import GzipFile + +# pypi imports +from arpy import Archive + +REQUIRED_HEADERS = ('package', 'version', 'architecture') + + +class DpkgError(Exception): + pass + + +class DpkgVersionError(Exception): + pass + + +class DpkgMissingControlFile(DpkgError): + pass + + +class DpkgMissingControlGzipFile(DpkgError): + pass + + +class DpkgMissingRequiredHeaderError(DpkgError): + pass + + +class Dpkg(object): + + """Class allowing import and manipulation of a debian package file.""" + + def __init__(self, filename=None): + self.headers = {} + if not isinstance(filename, basestring): + raise DpkgError('filename argument must be a string') + if not os.path.isfile(filename): + raise DpkgError('filename "%s" does not exist', filename) + self.control_str, self._control_headers = self._process_dpkg_file( + filename) + for k in self._control_headers.keys(): + self.headers[k] = self._control_headers[k] + + def __repr__(self): + return self.control_str + + def get_header(self, header): + """ case-independent query for a control message header value """ + return self.headers.get(header.lower(), '') + + def compare_version_with(self, version_str): + return Dpkg.compare_versions( + self.get_header('version'), + version_str) + + def _force_encoding(self, obj, encoding='utf-8'): + if isinstance(obj, basestring): + if not isinstance(obj, unicode): + obj = unicode(obj, encoding) + return obj + + def _process_dpkg_file(self, filename): + dpkg = Archive(filename) + dpkg.read_all_headers() + + if 'control.tar.gz' not in dpkg.archived_files: + raise DpkgMissingControlGzipFile( + 'Corrupt dpkg file: no control.tar.gz file in ar archive.') + + control_tgz = dpkg.archived_files['control.tar.gz'] + + # have to do an intermediate step because gzipfile doesn't support seek + # from end; luckily control tars are tiny + control_tar_intermediate = GzipFile(fileobj=control_tgz, mode='rb') + tar_data = control_tar_intermediate.read() + sio = StringIO(tar_data) + control_tar = tarfile.open(fileobj=sio) + + # pathname in the tar could be ./control, or just control + # (there would never be two control files...right?) + tar_members = [os.path.basename(x.name) + for x in control_tar.getmembers()] + if 'control' not in tar_members: + raise DpkgMissingControlFile( + 'Corrupt dpkg file: no control file in control.tar.gz.') + control_idx = tar_members.index('control') + + # at last! + control_file = control_tar.extractfile( + control_tar.getmembers()[control_idx]) + + # beware: dpkg will happily let people drop random encodings into the + # control file + control_str = self._force_encoding(control_file.read()) + + # now build the dict + control_file.seek(0) + control_headers = Message(control_file) + + for header in REQUIRED_HEADERS: + if header not in control_headers: + raise DpkgMissingRequiredHeaderError( + 'Corrupt control section; header: "%s" not found' % header) + + for header in control_headers: + control_headers[header] = self._force_encoding( + control_headers[header]) + + return control_str, control_headers + + @staticmethod + def get_epoch(version_str): + """ Parse the epoch out of a package version string. + Return (epoch, version); epoch is zero if not found.""" + try: + # there could be more than one colon, + # but we only care about the first + e_index = version_str.index(':') + except ValueError: + # no colons means no epoch; that's valid, man + return 0, version_str + + try: + epoch = int(version_str[0:e_index]) + except ValueError: + raise DpkgVersionError( + 'Corrupt dpkg version %s: epochs can only be ints, and ' + 'epochless versions cannot use the colon character.' % + version_str) + + return epoch, version_str[e_index + 1:] + + @staticmethod + def get_upstream(version_str): + """Given a version string that could potentially contain both an upstream + revision and a debian revision, return a tuple of both. If there is no + debian revision, return 0 as the second tuple element.""" + try: + d_index = version_str.rindex('-') + except ValueError: + # no hyphens means no debian version, also valid. + return version_str, '0' + + return version_str[0:d_index], version_str[d_index+1:] + + @staticmethod + def split_full_version(version_str): + epoch, full_ver = Dpkg.get_epoch(version_str) + upstream_rev, debian_rev = Dpkg.get_upstream(full_ver) + return epoch, upstream_rev, debian_rev + + @staticmethod + def get_alphas(revision_str): + """Return a tuple of the first non-digit characters of a revision (which + may be empty) and the remaining characters.""" + + # get the index of the first digit + for i, char in enumerate(revision_str): + if char.isdigit(): + if i == 0: + return '', revision_str + else: + return revision_str[0:i], revision_str[i:] + # string is entirely alphas + return revision_str, '' + + @staticmethod + def get_digits(revision_str): + """Return a tuple of the first integer characters of a revision (which + may be empty) and the remains.""" + + if not revision_str: + return 0, '' + + # get the index of the first non-digit + for i, char in enumerate(revision_str): + if not char.isdigit(): + if i == 0: + return 0, revision_str + else: + return int(revision_str[0:i]), revision_str[i:] + # string is entirely digits + return int(revision_str), '' + + @staticmethod + def listify(revision_str): + """Split a revision string into a list of alternating between strings and + numbers, padded on either end to always be "str, int, str, int..." and + always be of even length. This allows us to trivially implement the + comparison algorithm described at + http://debian.org/doc/debian-policy/ch-controlfields.html#s-f-Version + """ + result = [] + while revision_str: + r1, remains = Dpkg.get_alphas(revision_str) + r2, remains = Dpkg.get_digits(remains) + result.extend([r1, r2]) + revision_str = remains + return result + + @staticmethod + def dstringcmp(a, b): + """debian package version string section lexical sort algorithm + + "The lexical comparison is a comparison of ASCII values modified so + that all the letters sort earlier than all the non-letters and so that + a tilde sorts before anything, even the end of a part." + """ + + if a == b: + return 0 + try: + for i, char in enumerate(a): + if char == b[i]: + continue + # "a tilde sorts before anything, even the end of a part" + # (emptyness) + if char == '~': + return -1 + if b[i] == '~': + return 1 + # "all the letters sort earlier than all the non-letters" + if char.isalpha() and not b[i].isalpha(): + return -1 + if not char.isalpha() and b[i].isalpha(): + return 1 + # otherwise lexical sort + if ord(char) > ord(b[i]): + return 1 + if ord(char) < ord(b[i]): + return -1 + except IndexError: + # a is longer than b but otherwise equal, hence greater + # ...except for goddamn tildes + if char == '~': + return -1 + else: + return 1 + # if we get here, a is shorter than b but otherwise equal, hence lesser + # ...except for goddamn tildes + if b[len(a)] == '~': + return 1 + else: + return -1 + + @staticmethod + def compare_revision_strings(rev1, rev2): + if rev1 == rev2: + return 0 + + # listify pads results so that we will always be comparing ints to ints + # and strings to strings (at least until we fall off the end of a list) + list1 = Dpkg.listify(rev1) + list2 = Dpkg.listify(rev2) + + if list1 == list2: + return 0 + + try: + for i, item in enumerate(list1): + # just in case + if type(item) != type(list2[i]): + raise DpkgVersionError( + 'Cannot compare %s to %s, something has gone horribly ' + 'awry.' % (item, list2[i])) + # if the items are equal, next + if item == list2[i]: + continue + # numeric comparison + if type(item) == int: + if item > list2[i]: + return 1 + if item < list2[i]: + return -1 + else: + # string comparison + return Dpkg.dstringcmp(item, list2[i]) + except IndexError: + # rev1 is longer than rev2 but otherwise equal, hence greater + return 1 + # rev1 is shorter than rev2 but otherwise equal, hence lesser + return -1 + + @staticmethod + def compare_versions(ver1, ver2): + if ver1 == ver2: + return 0 + + # note the string conversion: the debian policy here explicitly + # specifies ASCII string comparisons, so if you are mad enough to + # actually cram unicode characters into your package name, you are on + # your own. + epoch1, upstream1, debian1 = Dpkg.split_full_version(str(ver1)) + epoch2, upstream2, debian2 = Dpkg.split_full_version(str(ver2)) + + # if epochs differ, immediately return the newer one + if epoch1 < epoch2: + return -1 + if epoch1 > epoch2: + return 1 + + # then, compare the upstream versions + upstr_res = Dpkg.compare_revision_strings(upstream1, upstream2) + if upstr_res != 0: + return upstr_res + + debian_res = Dpkg.compare_revision_strings(debian1, debian2) + if debian_res != 0: + return debian_res + + # at this point, the versions are equal, but due to an interpolated + # zero in either the epoch or the debian version + return 0 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..b88034e --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[metadata] +description-file = README.md diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..5583956 --- /dev/null +++ b/setup.py @@ -0,0 +1,20 @@ +from distutils.core import setup +setup( + name = 'pydpkg', + packages = ['pydpkg'], # this must be the same as the name above + version = '1.0', + description = 'A python library for parsing debian package control headers and comparing version strings', + author = 'Nathan J. Mehl', + author_email = 'n@climate.com', + url = 'https://github.com/theclimatecorporation/python-dpkg', + download_url = 'https://github.com/theclimatecorporation/python-dpkg/tarball/1.0', + keywords = ['apt', 'debian', 'dpkg', 'packaging'], + classifiers=[ + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 2.6", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: Implementation :: CPython", + "Topic :: System :: Archiving :: Packaging", + ] +) diff --git a/tests/test_dpkg.py b/tests/test_dpkg.py new file mode 100644 index 0000000..f607782 --- /dev/null +++ b/tests/test_dpkg.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +import unittest + +from pydpkg import Dpkg, DpkgVersionError + + +class DpkgTest(unittest.TestCase): + + def test_get_epoch(self): + self.assertEqual(Dpkg.get_epoch('0'), (0, '0')) + self.assertEqual(Dpkg.get_epoch('0:0'), (0, '0')) + self.assertEqual(Dpkg.get_epoch('1:0'), (1, '0')) + self.assertRaises(DpkgVersionError, Dpkg.get_epoch, '1a:0') + + def test_get_upstream(self): + self.assertEqual(Dpkg.get_upstream('00'), ('00', '0')) + self.assertEqual(Dpkg.get_upstream('foo'), ('foo', '0')) + self.assertEqual(Dpkg.get_upstream('foo-bar'), ('foo', 'bar')) + self.assertEqual(Dpkg.get_upstream('foo-bar-baz'), ('foo-bar', 'baz')) + + def test_split_full_version(self): + self.assertEqual(Dpkg.split_full_version('00'), (0, '00', '0')) + self.assertEqual(Dpkg.split_full_version('00-00'), (0, '00', '00')) + self.assertEqual(Dpkg.split_full_version('0:0'), (0, '0', '0')) + self.assertEqual(Dpkg.split_full_version('0:0-0'), (0, '0', '0')) + self.assertEqual(Dpkg.split_full_version('0:0.0'), (0, '0.0', '0')) + self.assertEqual(Dpkg.split_full_version('0:0.0-0'), (0, '0.0', '0')) + self.assertEqual(Dpkg.split_full_version('0:0.0-00'), (0, '0.0', '00')) + + def test_get_alpha(self): + self.assertEqual(Dpkg.get_alphas(''), ('', '')) + self.assertEqual(Dpkg.get_alphas('0'), ('', '0')) + self.assertEqual(Dpkg.get_alphas('00'), ('', '00')) + self.assertEqual(Dpkg.get_alphas('0a'), ('', '0a')) + self.assertEqual(Dpkg.get_alphas('a'), ('a', '')) + self.assertEqual(Dpkg.get_alphas('a0'), ('a', '0')) + + def test_get_digits(self): + self.assertEqual(Dpkg.get_digits('00'), (0, '')) + self.assertEqual(Dpkg.get_digits('0'), (0, '')) + self.assertEqual(Dpkg.get_digits('0a'), (0, 'a')) + self.assertEqual(Dpkg.get_digits('a'), (0, 'a')) + self.assertEqual(Dpkg.get_digits('a0'), (0, 'a0')) + + def test_listify(self): + self.assertEqual(Dpkg.listify('0'), ['', 0]) + self.assertEqual(Dpkg.listify('00'), ['', 0]) + self.assertEqual(Dpkg.listify('0a'), ['', 0, 'a', 0]) + self.assertEqual(Dpkg.listify('a0'), ['a', 0]) + self.assertEqual(Dpkg.listify('a00'), ['a', 0]) + self.assertEqual(Dpkg.listify('a'), ['a', 0]) + + def test_dstringcmp(self): + self.assertEqual(Dpkg.dstringcmp('~', '.'), -1) + self.assertEqual(Dpkg.dstringcmp('~', 'a'), -1) + self.assertEqual(Dpkg.dstringcmp('a', '.'), -1) + self.assertEqual(Dpkg.dstringcmp('a', '~'), 1) + self.assertEqual(Dpkg.dstringcmp('.', '~'), 1) + self.assertEqual(Dpkg.dstringcmp('.', 'a'), 1) + self.assertEqual(Dpkg.dstringcmp('.', '.'), 0) + self.assertEqual(Dpkg.dstringcmp('0', '0'), 0) + self.assertEqual(Dpkg.dstringcmp('a', 'a'), 0) + + # taken from + # http://www.debian.org/doc/debian-policy/ch-controlfields.html#s-f-Version + self.assertEqual( + sorted(['a', '', '~', '~~a', '~~'], cmp=Dpkg.dstringcmp), + ['~~', '~~a', '~', '', 'a']) + + def test_compare_revision_strings(self): + # note that these are testing a single revision string, not the full + # upstream+debian version. IOW, "0.0.9-foo" is an upstream or debian + # revision onto itself, not an upstream of 0.0.9 and a debian of foo. + + # equals + self.assertEqual(Dpkg.compare_revision_strings('0', '0'), 0) + self.assertEqual(Dpkg.compare_revision_strings('0', '00'), 0) + self.assertEqual(Dpkg.compare_revision_strings('00.0.9', '0.0.9'), 0) + self.assertEqual(Dpkg.compare_revision_strings('0.00.9-foo', '0.0.9-foo'), 0) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-1.00foo', '0.0.9-1.0foo'), 0) + + # less than + self.assertEqual(Dpkg.compare_revision_strings('0.0.9', '0.0.10'), -1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-foo', '0.0.10-foo'), -1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-foo', '0.0.10-goo'), -1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-foo', '0.0.9-goo'), -1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.10-foo', '0.0.10-goo'), -1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-1.0foo', '0.0.9-1.1foo'), -1) + + # greater than + self.assertEqual(Dpkg.compare_revision_strings('0.0.10', '0.0.9'), 1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.10-foo', '0.0.9-foo'), 1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.10-foo', '0.0.9-goo'), 1) + self.assertEqual(Dpkg.compare_revision_strings('0.0.9-1.0foo', '0.0.9-1.0bar'), 1) + + def test_compare_versions(self): + # "This [the epoch] is a single (generally small) unsigned integer. + # It may be omitted, in which case zero is assumed." + self.assertEqual(Dpkg.compare_versions('0.0.0', '0:0.0.0'), 0) + self.assertEqual(Dpkg.compare_versions('0:0.0.0-foo', '0.0.0-foo'), 0) + self.assertEqual(Dpkg.compare_versions('0.0.0-a', '0:0.0.0-a'), 0) + + # "The absence of a debian_revision is equivalent to a debian_revision + # of 0." + self.assertEqual(Dpkg.compare_versions('0.0.0', '0.0.0-0'), 0) + # tricksy: + self.assertEqual(Dpkg.compare_versions('0.0.0', '0.0.0-00'), 0) + + # combining the above + self.assertEqual(Dpkg.compare_versions('0.0.0-0', '0:0.0.0'), 0) + + # explicitly equal + self.assertEqual(Dpkg.compare_versions('0.0.0', '0.0.0'), 0) + self.assertEqual(Dpkg.compare_versions('1:0.0.0', '1:0.0.0'), 0) + self.assertEqual(Dpkg.compare_versions('0.0.0-10', '0.0.0-10'), 0) + self.assertEqual(Dpkg.compare_versions('2:0.0.0-1', '2:0.0.0-1'), 0) + self.assertEqual(Dpkg.compare_versions('0:a.0.0-foo', '0:a.0.0-foo'), 0) + + # less than + self.assertEqual(Dpkg.compare_versions('0.0.0-0', '0:0.0.1'), -1) + self.assertEqual(Dpkg.compare_versions('0.0.0-0', '0:0.0.0-a'), -1) + self.assertEqual(Dpkg.compare_versions('0.0.0-0', '0:0.0.0-1'), -1) + self.assertEqual(Dpkg.compare_versions('0.0.9', '0.0.10'), -1) + self.assertEqual(Dpkg.compare_versions('0.9.0', '0.10.0'), -1) + self.assertEqual(Dpkg.compare_versions('9.0.0', '10.0.0'), -1) + + # greater than + self.assertEqual(Dpkg.compare_versions('0.0.1-0', '0:0.0.0'), 1) + self.assertEqual(Dpkg.compare_versions('0.0.0-a', '0:0.0.0-1'), 1) + self.assertEqual(Dpkg.compare_versions('0.0.0-a', '0:0.0.0-0'), 1) + self.assertEqual(Dpkg.compare_versions('0.0.9', '0.0.1'), 1) + self.assertEqual(Dpkg.compare_versions('0.9.0', '0.1.0'), 1) + self.assertEqual(Dpkg.compare_versions('9.0.0', '1.0.0'), 1) + + # unicode me harder + self.assertEqual(Dpkg.compare_versions(u'2:0.0.44-1', u'2:0.0.44-nobin'), -1) + self.assertEqual(Dpkg.compare_versions(u'2:0.0.44-nobin', u'2:0.0.44-1'), 1) + self.assertEqual(Dpkg.compare_versions(u'2:0.0.44-1', u'2:0.0.44-1'), 0) + + +if __name__ == "__main__": + suite = unittest.TestLoader().loadTestsFromTestCase(DpkgTest) + unittest.TextTestRunner(verbosity=2).run(suite)