Skip to content

Commit

Permalink
cpiofile: Add unittest opening a symlink as fileobj
Browse files Browse the repository at this point in the history
Signed-off-by: Bernhard Kaindl <[email protected]>
  • Loading branch information
bernhardkaindl committed May 16, 2024
1 parent 2d200d8 commit 855cb9b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 31 deletions.
3 changes: 3 additions & 0 deletions .vscode/ltex.dictionary.en-US.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ codecov
covcombine
coverallsapp
cpio
cpiofile
cpioinfo
euxv
ibft
ifrename
kname
lastboot
linkname
logbuf
MACPCI
nektos
Expand Down
80 changes: 55 additions & 25 deletions tests/test_cpiofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import pytest
from pyfakefs.fake_filesystem import FakeFileOpen, FakeFilesystem

from xcp.cpiofile import CpioFile
from xcp.cpiofile import CpioFile, StreamError

binary_data = b"\x00\x1b\x5b\x95\xb1\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xcc\xdd\xee\xff"

Expand Down Expand Up @@ -48,53 +48,79 @@ def test_cpiofile_modes(fs):
if comp == "xz" and filetype == ":":
continue # streaming xz is not implemented (supported only as file)
check_archive_mode(filetype + comp, fs)
if filetype == "|":
check_archive_mode(filetype + comp, fs, filename="archive." + comp)


def check_archive_mode(archive_mode, fs):
# type: (str, FakeFilesystem) -> None
def create_cpio_archive(fs, archive_mode, filename=None):
# type: (FakeFilesystem, str, str | None) -> io.BytesIO | None
"""
Test CpioFile in the given archive mode with verification of the archive contents.
Create a CpioFile archive with files and directories from a FakeFilesystem.
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
:param archive_mode: The archive mode is a string parameter that specifies the mode
in which the CpioFile object should be opened.
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
:param filename: The name of the file to create the cpio archive
"""
# Step 1: Create and populate a cpio archive in a BytesIO buffer
bytesio = io.BytesIO()
archive = CpioFile.open(fileobj=bytesio, mode="w" + archive_mode)
pyfakefs_populate_archive(archive, fs)
cpiofile = None if filename else io.BytesIO()
fs.reset()
cpio = CpioFile.open(name=filename, fileobj=cpiofile, mode="w" + archive_mode)
pyfakefs_populate_archive(cpio, fs)
if archive_mode == "|gz":
archive.list(verbose=True)
archive.close()
cpio.list(verbose=True)
cpio.close()
if not cpiofile:
cpio_data = FakeFileOpen(fs)(filename, "rb").read()
fs.reset()
fs.create_file(filename, contents=cast(str, cpio_data))
return None
fs.reset()
cpiofile.seek(0)
return cpiofile


def check_archive_mode(archive_mode, fs, filename=None):
# type: (str, FakeFilesystem, str | None) -> None
"""
Test CpioFile in the given archive mode with verification of the archive contents.
:param archive_mode: The archive mode is a string parameter that specifies the mode
in which the CpioFile object should be opened.
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
# Step 2: Extract the archive in a clean filesystem and verify the extracted contents
fs.reset()
bytesio.seek(0)
archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode)
cpiofile = create_cpio_archive(fs, archive_mode, filename)
archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode)
archive.extractall()
pyfakefs_verify_filesystem(fs)
assert archive.getnames() == ["dirname", "dirname/filename", "dir2/symlink"]
assert archive.getnames() == ["dirname", "dirname/filename", "symlink", "dir2/file_2"]
dirs = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isdir()]
files = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.isreg()]
symlinks = [cpioinfo.name for cpioinfo in archive.getmembers() if cpioinfo.issym()]
assert dirs == ["dirname"]
assert files == ["dirname/filename"]
assert symlinks == ["dir2/symlink"]
assert archive.getmember(symlinks[0]).linkname == "symlink_target"
assert files == ["dirname/filename", "dir2/file_2"]
assert symlinks == ["symlink"]
assert archive.getmember(symlinks[0]).linkname == "dirname/filename"

# Test extracting a symlink to a file object:
if archive_mode.startswith("|"): # Non-seekable streams raise StreamError
with pytest.raises(StreamError):
archive.extractfile("symlink")
else: # Expect a seekable fileobj for this test (not a stream) to work:
fileobj = archive.extractfile("symlink")
assert fileobj and fileobj.read() == binary_data
archive.close()

# Step 3: Extract the archive a second time using another method
fs.reset()
bytesio.seek(0)
archive = CpioFile.open(fileobj=bytesio, mode="r" + archive_mode)
cpiofile = create_cpio_archive(fs, archive_mode, filename)
archive = CpioFile.open(name=filename, fileobj=cpiofile, mode="r" + archive_mode)
if archive_mode[0] != "|":
for cpioinfo in archive:
archive.extract(cpioinfo)
pyfakefs_verify_filesystem(fs)
if archive_mode == "|xz":
archive.list(verbose=True)
archive.close()
bytesio.close()


def pyfakefs_populate_archive(archive, fs):
Expand All @@ -105,11 +131,12 @@ def pyfakefs_populate_archive(archive, fs):
:param archive: Instance of the CpioFile class to create a new cpio archive
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
fs.reset()

fs.create_file("dirname/filename", contents=cast(str, binary_data))
archive.add("dirname", recursive=True)
fs.create_symlink("directory/symlink", "symlink_target")
fs.create_file("directory/file_2", contents=cast(str, binary_data))
fs.create_symlink("symlink", "dirname/filename")
archive.add("symlink")

# Test special code path of archive.add(".", ...):
os.chdir("directory")
Expand All @@ -129,7 +156,10 @@ def pyfakefs_verify_filesystem(fs):
:param fs: `FakeFilesystem` fixture representing a simulated file system for testing
"""
assert fs.islink("dir2/symlink")
assert fs.islink("symlink")
assert fs.isfile("dirname/filename")
assert fs.isfile("dir2/file_2")
with FakeFileOpen(fs)("dirname/filename", "rb") as contents:
assert contents.read() == binary_data
with FakeFileOpen(fs)("dir2/file_2", "rb") as contents:
assert contents.read() == binary_data
9 changes: 3 additions & 6 deletions xcp/cpiofile.py
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,7 @@ def extract(self, member, path=""):
self._dbg(1, "cpiofile: %s" % e)

def extractfile(self, member):
# type:(CpioInfo) -> ExFileObject | None
# type:(CpioInfo | str | bytes) -> ExFileObject | None
"""Extract a member from the archive as a file object. `member' may be
a filename or a CpioInfo object. If `member' is a regular file, a
file-like object is returned. If `member' is a link, a file-like
Expand All @@ -1489,11 +1489,8 @@ def extractfile(self, member):
# A small but ugly workaround for the case that someone tries
# to extract a symlink as a file-object from a non-seekable
# stream of cpio blocks.
raise StreamError("cannot extract symlink as file object")
else:
# A symlink's file object is its target's file object.
return self.extractfile(self._getmember(cpioinfo.linkname,
cpioinfo)) # type: ignore
raise StreamError("Need a seekable stream to open() a symlink target!")
return self.extractfile(cpioinfo.linkname)
else:
# If there's no data associated with the member (directory, chrdev,
# blkdev, etc.), return None instead of a file object.
Expand Down

0 comments on commit 855cb9b

Please sign in to comment.