From e5a193b4d11202b13a6f00003a3b1e2a7621501d Mon Sep 17 00:00:00 2001 From: Tim dW Date: Wed, 8 Sep 2021 08:55:36 -0400 Subject: [PATCH 1/3] adding additional filtering --- .../generic_carver/code/generic_carver.py | 53 ++++++++++++++---- .../generic_carver/test/data/fake_ubi | Bin 0 -> 128 bytes .../generic_carver/test/data/fake_zlib.zlib | Bin 0 -> 14 bytes .../test_plugin_generic_carver_binwalk.py | 25 ++++++++- 4 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi create mode 100644 fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index f2adc4ce..e12e724e 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -3,14 +3,15 @@ ''' import logging import shutil +import zlib from pathlib import Path -from common_helper_process import execute_shell_command +from common_helper_process import execute_shell_command, execute_interactive_shell_command from fact_helper_file import get_file_type_from_path NAME = 'generic_carver' MIME_PATTERNS = ['generic/carver'] -VERSION = '0.8' +VERSION = '0.9' TAR_MAGIC = b'ustar' @@ -21,15 +22,19 @@ def unpack_function(file_path, tmp_dir): tmp_dir should be used to store the extracted files. ''' - logging.debug('File Type unknown: execute binwalk on {}'.format(file_path)) - output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') - + logging.debug(f'File Type unknown: execute binwalk on {file_path}') + output = execute_interactive_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}', timeout=600) drop_underscore_directory(tmp_dir) - return {'output': output, 'filter_log': ArchivesFilter(tmp_dir).remove_false_positive_archives()} + return {'output': output, 'filter_log': ArchivesFilter(tmp_dir, original_file=file_path).remove_false_positive_archives()} class ArchivesFilter: - def __init__(self, unpack_directory): + def __init__(self, unpack_directory, original_file=None): + if original_file: + self.original_size = Path(original_file).stat().st_size + else: + self.original_size = None + self.unpack_directory = Path(unpack_directory) self.screening_logs = [] @@ -37,20 +42,33 @@ def remove_false_positive_archives(self) -> str: for file_path in self.unpack_directory.iterdir(): file_type = get_file_type_from_path(file_path)['mime'] + # If the carved file is the same as the original file, then we don't want to keep it. + if self.check_file_size_same_as_original(file_path): + continue + if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path): self.check_archives_validity(file_path, 'tar -tvf {}', 'does not look like a tar archive') - elif file_type == 'application/x-xz': self.check_archives_validity(file_path, 'xz -c -d {} | wc -c') - elif file_type == 'application/gzip': self.check_archives_validity(file_path, 'gzip -c -d {} | wc -c') - elif file_type in ['application/zip', 'application/x-7z-compressed', 'application/x-lzma']: self.check_archives_validity(file_path, '7z l {}', 'ERROR') + elif file_type in ['compression/zlib', 'application/zlib']: + self.check_zlib_archive_validity(file_path) return '\n'.join(self.screening_logs) + def check_file_size_same_as_original(self, file_path: Path): + # binwalk will occasionally extract a file that is identical to the original + # usually the filename is 0.yyy, and that's totally unhelpful. Remove if this is the case. + if self.original_size is not None: + file_size = file_path.stat().st_size + if self.original_size == file_size: + self.remove_file(file_path) + return True + return False + @staticmethod def _is_possible_tar(file_type: str, file_path: Path) -> bool: # broken tar archives may be identified as octet-stream by newer versions of libmagic @@ -78,6 +96,21 @@ def remove_file(self, file_path): def output_is_empty(output): return int((output.split())[-1]) == 0 + def check_zlib_archive_validity(self, file_path): + with open(file_path, 'rb') as f: + data = f.read() + valid = False + try: + uncompressed = zlib.decompress(data) + # It's only a valid file if it has data... + if len(uncompressed): + valid = True + except zlib.error: + valid = False + + if not valid: + self.remove_file(file_path) + def drop_underscore_directory(tmp_dir): extracted_contents = list(Path(tmp_dir).iterdir()) diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_ubi new file mode 100644 index 0000000000000000000000000000000000000000..23bed9dd58cf524fa3c67968e4cf81792be6ddc4 GIT binary patch literal 128 zcmXq%5MVR8fFv3~1OrHp35Xe?7%T&07Fx-6gPwRz6r`%84|H~j;t0RU}v56%Dp literal 0 HcmV?d00001 diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib b/fact_extractor/plugins/unpacking/generic_carver/test/data/fake_zlib.zlib new file mode 100644 index 0000000000000000000000000000000000000000..babbeac8eb136c478e555abfd94c38e36471bb1e GIT binary patch literal 14 Ucmb=J!_3IQz`)1>Br4_r01$ivHUIzs literal 0 HcmV?d00001 diff --git a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py index ca59ebe1..2803231e 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py +++ b/fact_extractor/plugins/unpacking/generic_carver/test/test_plugin_generic_carver_binwalk.py @@ -1,11 +1,11 @@ import shutil from pathlib import Path from tempfile import TemporaryDirectory -import pytest +import pytest +from helperFunctions.file_system import get_test_data_dir from plugins.unpacking.generic_carver.code.generic_carver import ArchivesFilter from test.unit.unpacker.test_unpacker import TestUnpackerBase -from helperFunctions.file_system import get_test_data_dir # pylint: disable=protected-access @@ -32,11 +32,30 @@ def test_extraction_of_filtered_files(self): assert len(files) == 0 assert 'was removed' in meta_data['filter_log'] + in_file = str(TEST_DATA_DIR / 'fake_ubi') + files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver']) + assert len(files) == 0 + assert 'was removed' in meta_data['filter_log'] + -@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz']) +@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz', 'fake_zlib.zlib']) def test_remove_false_positives(filename): with TemporaryDirectory() as temp_dir: test_file = Path(temp_dir) / filename shutil.copyfile(TEST_DATA_DIR / filename, test_file) ArchivesFilter(temp_dir).remove_false_positive_archives() assert test_file.is_file() is False + + +def test_remove_self(): + with TemporaryDirectory() as temp_dir: + data = b'A' * 80 + org_file = Path(temp_dir) / 'orginal_file_1' + test_file = Path(temp_dir) / 'test_file_1' + + # Make two identical files. + org_file.write_bytes(data) + test_file.write_bytes(data) + + ArchivesFilter(temp_dir, original_file=str(org_file)).remove_false_positive_archives() + assert test_file.is_file() is False From add8a54adde9f501fc5dad8d769caf229cfe97d3 Mon Sep 17 00:00:00 2001 From: Caesurus <15038848+Caesurus@users.noreply.github.com> Date: Wed, 15 Sep 2021 11:04:06 -0400 Subject: [PATCH 2/3] Update generic_carver.py Latest binwalk wants an extra `--run-as=root` parameter --- .../plugins/unpacking/generic_carver/code/generic_carver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index e12e724e..02036263 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -23,7 +23,7 @@ def unpack_function(file_path, tmp_dir): ''' logging.debug(f'File Type unknown: execute binwalk on {file_path}') - output = execute_interactive_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}', timeout=600) + output = execute_interactive_shell_command(f'binwalk --run-as=root --extract --carve --signature --directory {tmp_dir} {file_path}', timeout=600) drop_underscore_directory(tmp_dir) return {'output': output, 'filter_log': ArchivesFilter(tmp_dir, original_file=file_path).remove_false_positive_archives()} From 4eecdbdcb90243dfa1a9fcab24b01d7a6fc1b7a3 Mon Sep 17 00:00:00 2001 From: Tim dW Date: Thu, 16 Sep 2021 14:48:27 -0400 Subject: [PATCH 3/3] Adding function to compute sha256 from a file on disk. --- fact_extractor/helperFunctions/hash.py | 14 ++++++++++++++ .../test/unit/helperFunctions/test_hash.py | 16 +++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/fact_extractor/helperFunctions/hash.py b/fact_extractor/helperFunctions/hash.py index 184da85e..1850e5bf 100644 --- a/fact_extractor/helperFunctions/hash.py +++ b/fact_extractor/helperFunctions/hash.py @@ -1,7 +1,9 @@ +import hashlib from hashlib import new from helperFunctions.dataConversion import make_bytes +HASH_BUFFER_SIZE = 64 * 1024 # 64KB def get_hash(hash_function, binary): binary = make_bytes(binary) @@ -13,3 +15,15 @@ def get_hash(hash_function, binary): def get_sha256(code): return get_hash('sha256', code) + + +def compute_sha256_of_file(file_path): + """ Computes the sha256 of the given file's contents """ + sha256 = hashlib.sha256() + with open(file_path, 'rb') as f: + while True: + data = f.read(HASH_BUFFER_SIZE) + if not data: + break + sha256.update(data) + return sha256.hexdigest() diff --git a/fact_extractor/test/unit/helperFunctions/test_hash.py b/fact_extractor/test/unit/helperFunctions/test_hash.py index ee6962fe..57a3ccd4 100644 --- a/fact_extractor/test/unit/helperFunctions/test_hash.py +++ b/fact_extractor/test/unit/helperFunctions/test_hash.py @@ -4,8 +4,10 @@ @author: weidenba ''' import unittest +from pathlib import Path +from tempfile import TemporaryDirectory -from helperFunctions.hash import get_sha256 +from helperFunctions.hash import get_sha256, compute_sha256_of_file class Test_hash_generation(unittest.TestCase): @@ -14,3 +16,15 @@ class Test_hash_generation(unittest.TestCase): def test_get_sha256(self): self.assertEqual(get_sha256(self.test_string), self.test_string_SHA256, "not correct from string") + + def test_compute_sha256_of_file(self): + data = b'-' * 2000 + + with TemporaryDirectory(prefix='unit_test_') as tmp_dir: + test_file = Path(tmp_dir) / 'test_file' + with open(test_file, 'wb') as f: + f.write(data) + + sha256 = compute_sha256_of_file(test_file) + # test that the computed sha is the same with both methods + self.assertEqual(get_sha256(data), sha256, "not correct from string")