diff --git a/fact_extractor/helperFunctions/hash.py b/fact_extractor/helperFunctions/hash.py index 184da85e..1850e5bf 100644 --- a/fact_extractor/helperFunctions/hash.py +++ b/fact_extractor/helperFunctions/hash.py @@ -1,7 +1,9 @@ +import hashlib from hashlib import new from helperFunctions.dataConversion import make_bytes +HASH_BUFFER_SIZE = 64 * 1024 # 64KB def get_hash(hash_function, binary): binary = make_bytes(binary) @@ -13,3 +15,15 @@ def get_hash(hash_function, binary): def get_sha256(code): return get_hash('sha256', code) + + +def compute_sha256_of_file(file_path): + """ Computes the sha256 of the given file's contents """ + sha256 = hashlib.sha256() + with open(file_path, 'rb') as f: + while True: + data = f.read(HASH_BUFFER_SIZE) + if not data: + break + sha256.update(data) + return sha256.hexdigest() diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index f2adc4ce..02036263 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -3,14 +3,15 @@ ''' import logging import shutil +import zlib from pathlib import Path -from common_helper_process import execute_shell_command +from common_helper_process import execute_shell_command, execute_interactive_shell_command from fact_helper_file import get_file_type_from_path NAME = 'generic_carver' MIME_PATTERNS = ['generic/carver'] -VERSION = '0.8' +VERSION = '0.9' TAR_MAGIC = b'ustar' @@ -21,15 +22,19 @@ def unpack_function(file_path, tmp_dir): tmp_dir should be used to store the extracted files. ''' - logging.debug('File Type unknown: execute binwalk on {}'.format(file_path)) - output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') - + logging.debug(f'File Type unknown: execute binwalk on {file_path}') + output = execute_interactive_shell_command(f'binwalk --run-as=root --extract --carve --signature --directory {tmp_dir} {file_path}', timeout=600) drop_underscore_directory(tmp_dir) - return {'output': output, 'filter_log': ArchivesFilter(tmp_dir).remove_false_positive_archives()} + return {'output': output, 'filter_log': ArchivesFilter(tmp_dir, original_file=file_path).remove_false_positive_archives()} class ArchivesFilter: - def __init__(self, unpack_directory): + def __init__(self, unpack_directory, original_file=None): + if original_file: + self.original_size = Path(original_file).stat().st_size + else: + self.original_size = None + self.unpack_directory = Path(unpack_directory) self.screening_logs = [] @@ -37,20 +42,33 @@ def remove_false_positive_archives(self) -> str: for file_path in self.unpack_directory.iterdir(): file_type = get_file_type_from_path(file_path)['mime'] + # If the carved file is the same as the original file, then we don't want to keep it. + if self.check_file_size_same_as_original(file_path): + continue + if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path): self.check_archives_validity(file_path, 'tar -tvf {}', 'does not look like a tar archive') - elif file_type == 'application/x-xz': self.check_archives_validity(file_path, 'xz -c -d {} | wc -c') - elif file_type == 'application/gzip': self.check_archives_validity(file_path, 'gzip -c -d {} | wc -c') - elif file_type in ['application/zip', 'application/x-7z-compressed', 'application/x-lzma']: self.check_archives_validity(file_path, '7z l {}', 'ERROR') + elif file_type in ['compression/zlib', 'application/zlib']: + self.check_zlib_archive_validity(file_path) return '\n'.join(self.screening_logs) + def check_file_size_same_as_original(self, file_path: Path): + # binwalk will occasionally extract a file that is identical to the original + # usually the filename is 0.yyy, and that's totally unhelpful. Remove if this is the case. + if self.original_size is not None: + file_size = file_path.stat().st_size + if self.original_size == file_size: + self.remove_file(file_path) + return True + return False + @staticmethod def _is_possible_tar(file_type: str, file_path: Path) -> bool: # broken tar archives may be identified as octet-stream by newer versions of libmagic @@ -78,6 +96,21 @@ def remove_file(self, file_path): def output_is_empty(output): return int((output.split())[-1]) == 0 + def check_zlib_archive_validity(self, file_path): + with open(file_path, 'rb') as f: + data = f.read() + valid = False + try: + uncompressed = zlib.decompress(data) + # It's only a valid file if it has data... + if len(uncompressed): + valid = True + except zlib.error: + valid = False + + if not valid: + self.remove_file(file_path) + def drop_underscore_directory(tmp_dir): extracted_contents = list(Path(tmp_dir).iterdir()) diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi new file mode 100644 index 00000000..23bed9dd Binary files /dev/null and b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib new file mode 100644 index 00000000..babbeac8 Binary files /dev/null and b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib differ diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py index ca59ebe1..2803231e 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py +++ b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py @@ -1,11 +1,11 @@ import shutil from pathlib import Path from tempfile import TemporaryDirectory -import pytest +import pytest +from helperFunctions.file_system import get_test_data_dir from plugins.unpacking.generic_carver.code.generic_carver import ArchivesFilter from test.unit.unpacker.test_unpacker import TestUnpackerBase -from helperFunctions.file_system import get_test_data_dir # pylint: disable=protected-access @@ -32,11 +32,30 @@ def test_extraction_of_filtered_files(self): assert len(files) == 0 assert 'was removed' in meta_data['filter_log'] + in_file = str(TEST_DATA_DIR / 'fake_ubi') + files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']) + assert len(files) == 0 + assert 'was removed' in meta_data['filter_log'] + -@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz']) +@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz', 'fake_zlib.zlib']) def test_remove_false_positives(filename): with TemporaryDirectory() as temp_dir: test_file = Path(temp_dir) / filename shutil.copyfile(TEST_DATA_DIR / filename, test_file) ArchivesFilter(temp_dir).remove_false_positive_archives() assert test_file.is_file() is False + + +def test_remove_self(): + with TemporaryDirectory() as temp_dir: + data = b'A' * 80 + org_file = Path(temp_dir) / 'orginal_file_1' + test_file = Path(temp_dir) / 'test_file_1' + + # Make two identical files. + org_file.write_bytes(data) + test_file.write_bytes(data) + + ArchivesFilter(temp_dir, original_file=str(org_file)).remove_false_positive_archives() + assert test_file.is_file() is False diff --git a/fact_extractor/test/unit/helperFunctions/test_hash.py b/fact_extractor/test/unit/helperFunctions/test_hash.py index ee6962fe..57a3ccd4 100644 --- a/fact_extractor/test/unit/helperFunctions/test_hash.py +++ b/fact_extractor/test/unit/helperFunctions/test_hash.py @@ -4,8 +4,10 @@ @author: weidenba ''' import unittest +from pathlib import Path +from tempfile import TemporaryDirectory -from helperFunctions.hash import get_sha256 +from helperFunctions.hash import get_sha256, compute_sha256_of_file class Test_hash_generation(unittest.TestCase): @@ -14,3 +16,15 @@ class Test_hash_generation(unittest.TestCase): def test_get_sha256(self): self.assertEqual(get_sha256(self.test_string), self.test_string_SHA256, "not correct from string") + + def test_compute_sha256_of_file(self): + data = b'-' * 2000 + + with TemporaryDirectory(prefix='unit_test_') as tmp_dir: + test_file = Path(tmp_dir) / 'test_file' + with open(test_file, 'wb') as f: + f.write(data) + + sha256 = compute_sha256_of_file(test_file) + # test that the computed sha is the same with both methods + self.assertEqual(get_sha256(data), sha256, "not correct from string")