Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding additional filtering for generic carver #83

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions fact_extractor/helperFunctions/hash.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import hashlib
from hashlib import new

from helperFunctions.dataConversion import make_bytes

HASH_BUFFER_SIZE = 64 * 1024 # 64KB

def get_hash(hash_function, binary):
binary = make_bytes(binary)
@@ -13,3 +15,15 @@ def get_hash(hash_function, binary):

def get_sha256(code):
return get_hash('sha256', code)


def compute_sha256_of_file(file_path):
""" Computes the sha256 of the given file's contents """
sha256 = hashlib.sha256()
with open(file_path, 'rb') as f:
while True:
data = f.read(HASH_BUFFER_SIZE)
if not data:
break
sha256.update(data)
return sha256.hexdigest()
Original file line number Diff line number Diff line change
@@ -3,14 +3,15 @@
'''
import logging
import shutil
import zlib
from pathlib import Path

from common_helper_process import execute_shell_command
from common_helper_process import execute_shell_command, execute_interactive_shell_command
from fact_helper_file import get_file_type_from_path

NAME = 'generic_carver'
MIME_PATTERNS = ['generic/carver']
VERSION = '0.8'
VERSION = '0.9'

TAR_MAGIC = b'ustar'

@@ -21,36 +22,53 @@ def unpack_function(file_path, tmp_dir):
tmp_dir should be used to store the extracted files.
'''

logging.debug('File Type unknown: execute binwalk on {}'.format(file_path))
output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}')

logging.debug(f'File Type unknown: execute binwalk on {file_path}')
output = execute_interactive_shell_command(f'binwalk --run-as=root --extract --carve --signature --directory {tmp_dir} {file_path}', timeout=600)
drop_underscore_directory(tmp_dir)
return {'output': output, 'filter_log': ArchivesFilter(tmp_dir).remove_false_positive_archives()}
return {'output': output, 'filter_log': ArchivesFilter(tmp_dir, original_file=file_path).remove_false_positive_archives()}


class ArchivesFilter:
def __init__(self, unpack_directory):
def __init__(self, unpack_directory, original_file=None):
if original_file:
self.original_size = Path(original_file).stat().st_size
else:
self.original_size = None

self.unpack_directory = Path(unpack_directory)
self.screening_logs = []

def remove_false_positive_archives(self) -> str:
for file_path in self.unpack_directory.iterdir():
file_type = get_file_type_from_path(file_path)['mime']

# If the carved file is the same as the original file, then we don't want to keep it.
if self.check_file_size_same_as_original(file_path):
continue

if file_type == 'application/x-tar' or self._is_possible_tar(file_type, file_path):
self.check_archives_validity(file_path, 'tar -tvf {}', 'does not look like a tar archive')

elif file_type == 'application/x-xz':
self.check_archives_validity(file_path, 'xz -c -d {} | wc -c')

elif file_type == 'application/gzip':
self.check_archives_validity(file_path, 'gzip -c -d {} | wc -c')

elif file_type in ['application/zip', 'application/x-7z-compressed', 'application/x-lzma']:
self.check_archives_validity(file_path, '7z l {}', 'ERROR')
elif file_type in ['compression/zlib', 'application/zlib']:
self.check_zlib_archive_validity(file_path)

return '\n'.join(self.screening_logs)

def check_file_size_same_as_original(self, file_path: Path):
# binwalk will occasionally extract a file that is identical to the original
# usually the filename is 0.yyy, and that's totally unhelpful. Remove if this is the case.
if self.original_size is not None:
file_size = file_path.stat().st_size
if self.original_size == file_size:
self.remove_file(file_path)
return True
return False

@staticmethod
def _is_possible_tar(file_type: str, file_path: Path) -> bool:
# broken tar archives may be identified as octet-stream by newer versions of libmagic
@@ -78,6 +96,21 @@ def remove_file(self, file_path):
def output_is_empty(output):
return int((output.split())[-1]) == 0

def check_zlib_archive_validity(self, file_path):
with open(file_path, 'rb') as f:
data = f.read()
valid = False
try:
uncompressed = zlib.decompress(data)
# It's only a valid file if it has data...
if len(uncompressed):
valid = True
except zlib.error:
valid = False

if not valid:
self.remove_file(file_path)


def drop_underscore_directory(tmp_dir):
extracted_contents = list(Path(tmp_dir).iterdir())
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import shutil
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest

import pytest
from helperFunctions.file_system import get_test_data_dir
from plugins.unpacking.generic_carver.code.generic_carver import ArchivesFilter
from test.unit.unpacker.test_unpacker import TestUnpackerBase
from helperFunctions.file_system import get_test_data_dir

# pylint: disable=protected-access

@@ -32,11 +32,30 @@ def test_extraction_of_filtered_files(self):
assert len(files) == 0
assert 'was removed' in meta_data['filter_log']

in_file = str(TEST_DATA_DIR / 'fake_ubi')
files, meta_data = self.unpacker._extract_files_from_file_using_specific_unpacker(in_file, self.tmp_dir.name, self.unpacker.unpacker_plugins['generic/carver'])
assert len(files) == 0
assert 'was removed' in meta_data['filter_log']


@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz'])
@pytest.mark.parametrize('filename', ['fake_zip.zip', 'fake_tar.tar', 'fake_7z.7z', 'fake_xz.xz', 'fake_gz.gz', 'fake_zlib.zlib'])
def test_remove_false_positives(filename):
with TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / filename
shutil.copyfile(TEST_DATA_DIR / filename, test_file)
ArchivesFilter(temp_dir).remove_false_positive_archives()
assert test_file.is_file() is False


def test_remove_self():
with TemporaryDirectory() as temp_dir:
data = b'A' * 80
org_file = Path(temp_dir) / 'orginal_file_1'
test_file = Path(temp_dir) / 'test_file_1'

# Make two identical files.
org_file.write_bytes(data)
test_file.write_bytes(data)

ArchivesFilter(temp_dir, original_file=str(org_file)).remove_false_positive_archives()
assert test_file.is_file() is False
16 changes: 15 additions & 1 deletion fact_extractor/test/unit/helperFunctions/test_hash.py
Original file line number Diff line number Diff line change
@@ -4,8 +4,10 @@
@author: weidenba
'''
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory

from helperFunctions.hash import get_sha256
from helperFunctions.hash import get_sha256, compute_sha256_of_file


class Test_hash_generation(unittest.TestCase):
@@ -14,3 +16,15 @@ class Test_hash_generation(unittest.TestCase):

def test_get_sha256(self):
self.assertEqual(get_sha256(self.test_string), self.test_string_SHA256, "not correct from string")

def test_compute_sha256_of_file(self):
data = b'-' * 2000

with TemporaryDirectory(prefix='unit_test_') as tmp_dir:
test_file = Path(tmp_dir) / 'test_file'
with open(test_file, 'wb') as f:
f.write(data)

sha256 = compute_sha256_of_file(test_file)
# test that the computed sha is the same with both methods
self.assertEqual(get_sha256(data), sha256, "not correct from string")