Skip to content

Commit

Permalink
Merge pull request #11 from TheClimateCorporation/xv-support
Browse files Browse the repository at this point in the history
Add support for lzma-compressed control archives
  • Loading branch information
memory authored Dec 2, 2019
2 parents 27e2924 + 79088e1 commit 710cc25
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 39 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
target/
*.egg-info
.cache
venv/
82 changes: 48 additions & 34 deletions pydpkg/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import hashlib
import io
import logging
import lzma
import os
import tarfile

Expand Down Expand Up @@ -40,11 +41,11 @@ class DpkgVersionError(DpkgError):


class DpkgMissingControlFile(DpkgError):
"""No control file found in control.tar.gz"""
"""No control file found in control.tar.gz/xz"""


class DpkgMissingControlGzipFile(DpkgError):
"""No control.tar.gz file found in dpkg file"""
"""No control.tar.gz/xz file found in dpkg file"""


class DpkgMissingRequiredHeaderError(DpkgError):
Expand Down Expand Up @@ -275,42 +276,55 @@ def _force_encoding(obj, encoding='utf-8'):
obj = six.text_type(obj, encoding)
return obj

def _extract_message(self, ctar):
# pathname in the tar could be ./control, or just control
# (there would never be two control files...right?)
tar_members = [
os.path.basename(x.name) for x in ctar.getmembers()]
self._log.debug('got tar members: %s', tar_members)
if 'control' not in tar_members:
raise DpkgMissingControlFile(
'Corrupt dpkg file: no control file in control.tar.gz')
control_idx = tar_members.index('control')
self._log.debug('got control index: %s', control_idx)
# at last!
control_file = ctar.extractfile(
ctar.getmembers()[control_idx])
self._log.debug('got control file: %s', control_file)
message_body = control_file.read()
# py27 lacks email.message_from_bytes, so...
if isinstance(message_body, bytes):
message_body = message_body.decode('utf-8')
message = message_from_string(message_body)
self._log.debug('got control message: %s', message)
return message

def _process_dpkg_file(self, filename):
dpkg_archive = Archive(filename)
dpkg_archive.read_all_headers()
try:
control_tgz = dpkg_archive.archived_files[b'control.tar.gz']
except KeyError:
if b'control.tar.gz' in dpkg_archive.archived_files:
control_archive = dpkg_archive.archived_files[b'control.tar.gz']
control_archive_type = "gz"
elif b'control.tar.xz' in dpkg_archive.archived_files:
control_archive = dpkg_archive.archived_files[b'control.tar.xz']
control_archive_type = "xz"
else:
raise DpkgMissingControlGzipFile(
'Corrupt dpkg file: no control.tar.gz file in ar archive.')
self._log.debug('found controlgz: %s', control_tgz)

# have to pass through BytesIO because gzipfile doesn't support seek
# from end; luckily control tars are tiny
with GzipFile(fileobj=control_tgz) as gzf:
self._log.debug('opened gzip file: %s', gzf)
with tarfile.open(fileobj=io.BytesIO(gzf.read())) as control_tar:
self._log.debug('opened tar file: %s', control_tar)
# pathname in the tar could be ./control, or just control
# (there would never be two control files...right?)
tar_members = [
os.path.basename(x.name) for x in control_tar.getmembers()]
self._log.debug('got tar members: %s', tar_members)
if 'control' not in tar_members:
raise DpkgMissingControlFile(
'Corrupt dpkg file: no control file in control.tar.gz')
control_idx = tar_members.index('control')
self._log.debug('got control index: %s', control_idx)
# at last!
control_file = control_tar.extractfile(
control_tar.getmembers()[control_idx])
self._log.debug('got control file: %s', control_file)
message_body = control_file.read()
# py27 lacks email.message_from_bytes, so...
if isinstance(message_body, bytes):
message_body = message_body.decode('utf-8')
message = message_from_string(message_body)
self._log.debug('got control message: %s', message)
'Corrupt dpkg file: no control.tar.gz/xz file in ar archive.')
self._log.debug('found controlgz: %s', control_archive)

if control_archive_type == "gz":
with GzipFile(fileobj=control_archive) as gzf:
self._log.debug('opened gzip control archive: %s', gzf)
with tarfile.open(fileobj=io.BytesIO(gzf.read())) as ctar:
self._log.debug('opened tar file: %s', ctar)
message = self._extract_message(ctar)
else:
with lzma.open(control_archive) as xzf:
self._log.debug('opened xz control archive: %s', xzf)
with tarfile.open(fileobj=io.BytesIO(xzf.read())) as ctar:
self._log.debug('opened tar file: %s', ctar)
message = self._extract_message(ctar)

for req in REQUIRED_HEADERS:
if req not in list(map(str.lower, message.keys())):
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from distutils.core import setup

__VERSION__ = '1.4.0'
__VERSION__ = '1.4.1'

setup(
name='pydpkg',
Expand Down
Binary file added tests/sample_package_xz.deb
Binary file not shown.
41 changes: 37 additions & 4 deletions tests/test_dpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
from pydpkg import Dpkg, DpkgVersionError


TEST_DPKG_FILE = 'testdeb_1:0.0.0-test_all.deb'
TEST_DPKG_GZ_FILE = 'testdeb_1:0.0.0-test_all.deb'
TEST_DPKG_XZ_FILE = 'sample_package_xz.deb'


class DpkgTest(unittest.TestCase):
class DpkgGzTest(unittest.TestCase):
def setUp(self):
dpkgfile = os.path.join(os.path.dirname(__file__), TEST_DPKG_FILE)
dpkgfile = os.path.join(os.path.dirname(__file__), TEST_DPKG_GZ_FILE)
self.dpkg = Dpkg(dpkgfile)

def test_get_versions(self):
Expand All @@ -38,6 +39,34 @@ def test_message(self):
self.assertIsInstance(self.dpkg.message, type(Message()))


class DpkgXzTest(unittest.TestCase):
def setUp(self):
dpkgfile = os.path.join(os.path.dirname(__file__), TEST_DPKG_XZ_FILE)
self.dpkg = Dpkg(dpkgfile)

def test_get_versions(self):
self.assertEqual(self.dpkg.epoch, 0)
self.assertEqual(self.dpkg.upstream_version, '0.0.1')
self.assertEqual(self.dpkg.debian_revision, '0')

def test_get_message_headers(self):
self.assertEqual(self.dpkg.package, 'samplepackage.test')
self.assertEqual(self.dpkg.PACKAGE, 'samplepackage.test')
self.assertEqual(self.dpkg['package'], 'samplepackage.test')
self.assertEqual(self.dpkg['PACKAGE'], 'samplepackage.test')
self.assertEqual(self.dpkg.get('package'), 'samplepackage.test')
self.assertEqual(self.dpkg.get('PACKAGE'), 'samplepackage.test')
self.assertEqual(self.dpkg.get('nonexistent'), None)
self.assertEqual(self.dpkg.get('nonexistent', 'foo'), 'foo')

def test_missing_header(self):
self.assertRaises(KeyError, self.dpkg.__getitem__, 'xyzzy')
self.assertRaises(AttributeError, self.dpkg.__getattr__, 'xyzzy')

def test_message(self):
self.assertIsInstance(self.dpkg.message, type(Message()))


class DpkgVersionsTest(unittest.TestCase):

def test_get_epoch(self):
Expand Down Expand Up @@ -174,5 +203,9 @@ def test_compare_versions(self):


if __name__ == "__main__":
suite = unittest.TestLoader().loadTestsFromTestCase(DpkgTest)
suite = unittest.TestLoader().loadTestsFromTestCase(DpkgGzTest)
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestLoader().loadTestsFromTestCase(DpkgXzTest)
unittest.TextTestRunner(verbosity=2).run(suite)
suite = unittest.TestLoader().loadTestsFromTestCase(DpkgVersionsTest)
unittest.TextTestRunner(verbosity=2).run(suite)

0 comments on commit 710cc25

Please sign in to comment.