diff --git a/fact_extractor/config/main.cfg b/fact_extractor/config/main.cfg deleted file mode 100644 index 5061222d..00000000 --- a/fact_extractor/config/main.cfg +++ /dev/null @@ -1,10 +0,0 @@ -[unpack] -blacklist = audio/mpeg, image/png, image/jpeg, image/gif, application/x-shockwave-flash, video/mp4, video/mpeg, video/quicktime, video/x-msvideo, video/ogg, text/plain, application/pdf -data_folder = /tmp/extractor -exclude = - -[ExpertSettings] -statistics = True -unpack_threshold = 0.8 -header_overhead = 256 -compressed_file_types = application/x-shockwave-flash, audio/mpeg, audio/ogg, image/png, image/jpeg, image/gif, video/mp4, video/ogg diff --git a/fact_extractor/config/main.toml b/fact_extractor/config/main.toml new file mode 100644 index 00000000..1cfc3092 --- /dev/null +++ b/fact_extractor/config/main.toml @@ -0,0 +1,43 @@ +[unpack] +blacklist = [ + "application/json", + "application/pdf", + "application/x-java-applet", + "application/x-object", + "application/x-sharedlib", + "application/x-shockwave-flash", + "application/x-terminfo", + "audio/mpeg", + "font/sfnt", + "image/bmp", + "image/gif", + "image/jpeg", + "image/png", + "image/x-tga", + "image/x-win-bitmap", + "inode/symlink", + "text/csv", + "text/plain", + "video/mp4", + "video/mpeg", + "video/ogg", + "video/quicktime", + "video/x-msvideo", +] +data_folder = "/tmp/extractor" +exclude = [] + +[expert_settings] +statistics = true +unpack_threshold = 0.8 +header_overhead = 256 +compressed_file_types = [ + "application/x-shockwave-flash", + "audio/mpeg", + "audio/ogg", + "image/gif", + "image/jpeg", + "image/png", + "video/mp4", + "video/ogg", +] diff --git a/fact_extractor/docker_extraction.py b/fact_extractor/docker_extraction.py index f38946d4..0817aa27 100755 --- a/fact_extractor/docker_extraction.py +++ b/fact_extractor/docker_extraction.py @@ -20,9 +20,9 @@ from pathlib import Path import sys -from helperFunctions.config import get_config_dir +from helperFunctions.config import load_config from helperFunctions.file_system import change_owner_of_output_files -from helperFunctions.program_setup import check_ulimits, load_config, setup_logging +from helperFunctions.program_setup import check_ulimits, setup_logging from unpacker.unpack import unpack @@ -41,17 +41,18 @@ def _parse_args(): def main(args): - config = load_config(f'{get_config_dir()}/main.cfg') + config = load_config() setup_logging(debug=False) check_ulimits() - input_dir = Path(config.get('unpack', 'data_folder'), 'input') + data_folder = Path(config.unpack.data_folder) + input_dir = data_folder / 'input' input_file = list(input_dir.iterdir())[0] unpack(input_file, config, args.extract_everything) if args.chown: - output_dir = Path(config.get('unpack', 'data_folder'), 'files') + output_dir = data_folder / 'files' return change_owner_of_output_files(output_dir, args.chown) return 0 diff --git a/fact_extractor/fact_extract.py b/fact_extractor/fact_extract.py index 078d57fe..2442dc21 100755 --- a/fact_extractor/fact_extract.py +++ b/fact_extractor/fact_extract.py @@ -20,7 +20,8 @@ import sys from pathlib import Path -from helperFunctions.program_setup import setup_argparser, setup_logging, load_config +from helperFunctions.config import load_config +from helperFunctions.program_setup import setup_argparser, setup_logging from unpacker.unpack import unpack @@ -30,7 +31,7 @@ def main(): setup_logging(arguments.debug, log_file=arguments.log_file, log_level=arguments.log_level) # Make sure report folder exists some meta.json can be written - report_folder = Path(config.get('unpack', 'data_folder'), 'reports') + report_folder = Path(config.unpack.data_folder, 'reports') report_folder.mkdir(parents=True, exist_ok=True) unpack(arguments.FILE_PATH, config) diff --git a/fact_extractor/helperFunctions/config.py b/fact_extractor/helperFunctions/config.py index 85be1a1a..c9b66409 100644 --- a/fact_extractor/helperFunctions/config.py +++ b/fact_extractor/helperFunctions/config.py @@ -1,41 +1,41 @@ -import os -from configparser import ConfigParser, NoOptionError, NoSectionError +from pathlib import Path +from typing import List + +import toml +from pydantic import BaseModel, Field from helperFunctions.file_system import get_src_dir -def load_config(config_file_name): - ''' - loads config of CONFIG_DIR/config_file_name - Returns config object - ''' - config = ConfigParser() - config_path = '{}/{}'.format(get_config_dir(), config_file_name) - if os.path.exists(config_path): - config.read(config_path) - return config - raise RuntimeError('Cannot load config') +class ExtractorUnpackConfig(BaseModel): + blacklist: List[str] = Field(default_factory=list) + data_folder: str = "/tmp/extractor" + exclude: List[str] = Field(default_factory=list) -def get_config_dir(): - ''' - Returns the absolute path of the config directory - ''' - return '{}/config'.format(get_src_dir()) +class ExpertSettings(BaseModel): + statistics: bool = True + unpack_threshold: float = 0.8 + header_overhead: int = 256 + compressed_file_types: List[str] = Field(default_factory=list) + +class FactExtractorConfig(BaseModel): + unpack: ExtractorUnpackConfig + expert_settings: ExpertSettings -def read_list_from_config(config_file: ConfigParser, section: str, key: str, default=None): - if default is None: - default = [] - if not config_file: - return default +def load_config(config_file_name: str = 'main.toml') -> FactExtractorConfig: + ''' + loads config of CONFIG_DIR/config_file_name + Returns config object + ''' + config_path = get_config_dir() / config_file_name + if config_path.is_file(): + cfg_data = toml.loads(config_path.read_text()) + return FactExtractorConfig(**cfg_data) + raise RuntimeError('Cannot load config') - try: - config_entry = config_file.get(section, key) - except (NoOptionError, NoSectionError): - return default - if not config_entry: - return default - return [item.strip() for item in config_entry.split(',') if item] +def get_config_dir() -> Path: + return Path(get_src_dir()) / 'config' diff --git a/fact_extractor/helperFunctions/install.py b/fact_extractor/helperFunctions/install.py index cc0194e7..921f0d0f 100644 --- a/fact_extractor/helperFunctions/install.py +++ b/fact_extractor/helperFunctions/install.py @@ -159,7 +159,7 @@ def _checkout_github_project(github_path, folder_name): def load_main_config(): config = configparser.ConfigParser() - config_path = Path(Path(__file__).parent.parent, 'config', 'main.cfg') + config_path = Path(Path(__file__).parent.parent, 'config', 'main.toml') if not config_path.is_file(): raise InstallationError(f'Could not load config at path {config_path}') config.read(str(config_path)) diff --git a/fact_extractor/helperFunctions/program_setup.py b/fact_extractor/helperFunctions/program_setup.py index 994bdcec..abb74fa9 100644 --- a/fact_extractor/helperFunctions/program_setup.py +++ b/fact_extractor/helperFunctions/program_setup.py @@ -1,5 +1,4 @@ import argparse -import configparser import logging import resource @@ -10,19 +9,53 @@ def setup_argparser(name, description, command_line_options, version=__VERSION__): - parser = argparse.ArgumentParser(description='{} - {}'.format(name, description)) - parser.add_argument('-V', '--version', action='version', version='{} {}'.format(name, version)) - parser.add_argument('-l', '--log_file', help='path to log file', default=None) - parser.add_argument('-L', '--log_level', help='define the log level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default=None) - parser.add_argument('-d', '--debug', action='store_true', default=False, help='print debug messages') - parser.add_argument('-C', '--config_file', help='set path to config File', default='{}/main.cfg'.format(get_config_dir())) - parser.add_argument('FILE_PATH', type=str, help='Path to file that should be extracted') + parser = argparse.ArgumentParser(description=f'{name} - {description}') + parser.add_argument( + '-V', + '--version', + action='version', + version=f'{name} {version}', + ) + parser.add_argument( + '-l', + '--log_file', + help='path to log file', + default=None, + ) + parser.add_argument( + '-L', + '--log_level', + help='define the log level', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], + default=None, + ) + parser.add_argument( + '-d', + '--debug', + action='store_true', + default=False, + help='print debug messages', + ) + parser.add_argument( + '-C', + '--config_file', + help='set path to config File', + default=f'{get_config_dir()}/main.cfg', + ) + parser.add_argument( + 'FILE_PATH', + type=str, + help='Path to file that should be extracted', + ) return parser.parse_args(command_line_options[1:]) def setup_logging(debug, log_file=None, log_level=None): log_level = log_level if log_level else logging.WARNING - log_format = logging.Formatter(fmt='[%(asctime)s][%(module)s][%(levelname)s]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + log_format = logging.Formatter( + fmt='[%(asctime)s][%(module)s][%(levelname)s]: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + ) logger = logging.getLogger('') logger.setLevel(logging.DEBUG) @@ -48,9 +81,3 @@ def check_ulimits(): logging.info(f'The number of openable files has been raised from {soft} to {min(1024, hard)}.') elif soft == resource.RLIM_INFINITY or soft > 100000: logging.warning('Warning: A very high (or no) nofile limit will slow down fakeroot and cause other problems.') - - -def load_config(config_file): - config = configparser.ConfigParser() - config.read(config_file) - return config diff --git a/fact_extractor/helperFunctions/statistics.py b/fact_extractor/helperFunctions/statistics.py index 1333e70d..bd46a0f2 100644 --- a/fact_extractor/helperFunctions/statistics.py +++ b/fact_extractor/helperFunctions/statistics.py @@ -1,17 +1,19 @@ -from configparser import ConfigParser +from __future__ import annotations + from contextlib import suppress -from pathlib import Path -from typing import Dict, List +from typing import TYPE_CHECKING from common_helper_files import safe_rglob -from common_helper_unpacking_classifier import ( - avg_entropy, get_binary_size_without_padding, is_compressed -) +from common_helper_unpacking_classifier import avg_entropy, get_binary_size_without_padding, is_compressed from fact_helper_file import get_file_type_from_path -from helperFunctions.config import read_list_from_config + +from helperFunctions.config import FactExtractorConfig + +if TYPE_CHECKING: + from pathlib import Path -def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): +def add_unpack_statistics(extraction_dir: Path, meta_data: dict): unpacked_files, unpacked_directories = 0, 0 for extracted_item in safe_rglob(extraction_dir): if extracted_item.is_file(): @@ -23,21 +25,31 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): meta_data['number_of_unpacked_directories'] = unpacked_directories -def get_unpack_status(file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser): +def get_unpack_status( + file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: FactExtractorConfig +): meta_data['summary'] = [] meta_data['entropy'] = avg_entropy(binary) if not extracted_files and meta_data.get('number_of_excluded_files', 0) == 0: - if get_file_type_from_path(file_path)['mime'] in read_list_from_config(config, 'ExpertSettings', 'compressed_file_types')\ - or not is_compressed(binary, compress_entropy_threshold=config.getfloat('ExpertSettings', 'unpack_threshold'), classifier=avg_entropy): + mime = get_file_type_from_path(file_path)['mime'] + if mime in config.expert_settings.compressed_file_types or not _is_compressed(binary, config): meta_data['summary'] = ['unpacked'] else: meta_data['summary'] = ['packed'] else: - _detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead')) + _detect_unpack_loss(binary, extracted_files, meta_data, config.expert_settings.header_overhead) + + +def _is_compressed(binary, config: FactExtractorConfig): + return is_compressed( + binary, + compress_entropy_threshold=config.expert_settings.unpack_threshold, + classifier=avg_entropy, + ) -def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int): +def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int): decoding_overhead = 1 - meta_data.get('encoding_overhead', 0) cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead size_of_extracted_files = _total_size_of_extracted_files(extracted_files) @@ -46,7 +58,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost'] -def _total_size_of_extracted_files(extracted_files: List[Path]) -> int: +def _total_size_of_extracted_files(extracted_files: list[Path]) -> int: total_size = 0 for item in extracted_files: with suppress(OSError): diff --git a/fact_extractor/server.py b/fact_extractor/server.py index a0625c50..e2a486cc 100644 --- a/fact_extractor/server.py +++ b/fact_extractor/server.py @@ -6,15 +6,15 @@ from flask import Flask from flask_restful import Api, Resource -from helperFunctions.config import load_config +from helperFunctions.config import load_config, FactExtractorConfig from helperFunctions.file_system import change_owner_of_output_files from helperFunctions.program_setup import setup_logging from unpacker.unpack import unpack app = Flask(__name__) api = Api(app) -config = load_config('main.cfg') -setup_logging(False, log_level=int(os.getenv('LOG_LEVEL', logging.WARNING))) # pylint: disable=invalid-envvar-default +config: FactExtractorConfig = load_config('main.toml') +setup_logging(False, log_level=int(os.getenv('LOG_LEVEL', logging.WARNING))) @api.resource('/start/', methods=['GET']) @@ -23,7 +23,7 @@ def __init__(self): self.owner = os.getenv('CHMOD_OWNER', None) def get(self, folder): - input_dir = Path(config.get('unpack', 'data_folder'), folder, 'input') + input_dir = Path(config.unpack.data_folder, folder, 'input') try: input_file = list(input_dir.iterdir())[0] unpack(file_path=str(input_file), config=config, folder=folder) diff --git a/fact_extractor/test/data/helperFunctions/test.cfg b/fact_extractor/test/data/helperFunctions/test.cfg index f3887895..de906d8f 100644 --- a/fact_extractor/test/data/helperFunctions/test.cfg +++ b/fact_extractor/test/data/helperFunctions/test.cfg @@ -1,2 +1,24 @@ -[test] -test = test_config \ No newline at end of file +[unpack] +blacklist = [ + "image/bmp", + "image/gif", +] +data_folder = "/tmp/extractor" +exclude = [ + "foobar", +] + +[expert_settings] +statistics = false +unpack_threshold = 0.8 +header_overhead = 256 +compressed_file_types = [ + "application/x-shockwave-flash", + "audio/mpeg", + "audio/ogg", + "image/gif", + "image/jpeg", + "image/png", + "video/mp4", + "video/ogg", +] diff --git a/fact_extractor/test/unit/helperFunctions/test_config.py b/fact_extractor/test/unit/helperFunctions/test_config.py index 7bbe7503..a46747b6 100644 --- a/fact_extractor/test/unit/helperFunctions/test_config.py +++ b/fact_extractor/test/unit/helperFunctions/test_config.py @@ -1,47 +1,11 @@ -import os - -import pytest +from pathlib import Path +from helperFunctions.config import load_config from helperFunctions.file_system import get_test_data_dir -from helperFunctions.config import get_config_dir, load_config, read_list_from_config - - -def test_get_config_dir(): - assert os.path.exists('{}/main.cfg'.format(get_config_dir())), 'main config file not found' def test_load_config(monkeypatch): - monkeypatch.setattr('helperFunctions.config.get_config_dir', lambda: '{}/helperFunctions'.format(get_test_data_dir())) - test_config = load_config('test.cfg') - assert test_config['test']['test'] == 'test_config', 'config not correct' - - -@pytest.mark.parametrize('input_data, expected', [ - ('', []), - ('item1', ['item1']), - ('item1, item2, item3', ['item1', 'item2', 'item3']), - ('item1,item2,item3', ['item1', 'item2', 'item3']), - (' item1 , item2 , item3 ', ['item1', 'item2', 'item3']), -]) -def test_read_list_from_config(monkeypatch, input_data, expected): - monkeypatch.setattr('helperFunctions.config.get_config_dir', lambda: '{}/helperFunctions'.format(get_test_data_dir())) + monkeypatch.setattr('helperFunctions.config.get_config_dir', lambda: Path(f'{get_test_data_dir()}/helperFunctions')) test_config = load_config('test.cfg') - test_config.add_section('test_section') - test_config.set('test_section', 'test_option', input_data) - result = read_list_from_config(test_config, 'test_section', 'test_option') - assert result == expected - - -def test_read_list_from_config__key_not_in_config(monkeypatch): - monkeypatch.setattr('helperFunctions.config.get_config_dir', lambda: '{}/helperFunctions'.format(get_test_data_dir())) - test_config = load_config('test.cfg') - result = read_list_from_config(test_config, 'foo', 'bar') - assert result == [] - - result = read_list_from_config(test_config, 'test', 'bar') - assert result == [] - - -def test_read_list_from_config__no_config(monkeypatch): - result = read_list_from_config(None, 'foo', 'bar') - assert result == [] + assert test_config.unpack.exclude == ["foobar"] + assert test_config.expert_settings.statistics is False diff --git a/fact_extractor/test/unit/helperFunctions/test_program_setup.py b/fact_extractor/test/unit/helperFunctions/test_program_setup.py index 8a8e3dae..04a9e5e4 100644 --- a/fact_extractor/test/unit/helperFunctions/test_program_setup.py +++ b/fact_extractor/test/unit/helperFunctions/test_program_setup.py @@ -2,12 +2,10 @@ from pathlib import Path from helperFunctions.config import get_config_dir -from helperFunctions.program_setup import ( - load_config, setup_logging, setup_argparser -) +from helperFunctions.program_setup import setup_logging, setup_argparser -class ArgumentMock(): +class ArgumentMock: config_file = '{}/main.cfg'.format(get_config_dir()) log_level = 'WARNING' log_file = '/tmp/fact_test_log' @@ -15,10 +13,6 @@ class ArgumentMock(): debug = False -def test_load_config(): - config = load_config('{}/main.cfg'.format(get_config_dir())) - assert config['ExpertSettings']['unpack_threshold'] == '0.8' - def test_setup_logging(): args = ArgumentMock() diff --git a/fact_extractor/test/unit/helperFunctions/test_statistics.py b/fact_extractor/test/unit/helperFunctions/test_statistics.py index facc5009..77c1b103 100644 --- a/fact_extractor/test/unit/helperFunctions/test_statistics.py +++ b/fact_extractor/test/unit/helperFunctions/test_statistics.py @@ -1,8 +1,8 @@ -from configparser import ConfigParser from pathlib import Path import pytest +from helperFunctions.config import FactExtractorConfig from helperFunctions.file_system import get_test_data_dir from helperFunctions.statistics import get_unpack_status, _detect_unpack_loss @@ -14,11 +14,10 @@ def common_tmpdir(tmpdir): @pytest.fixture(scope='function') def config_fixture(common_tmpdir): - config = ConfigParser() - config.add_section('unpack') - config.set('unpack', 'data_folder', str(common_tmpdir)) - config.add_section('ExpertSettings') - config.set('ExpertSettings', 'unpack_threshold', '0.8') + config = FactExtractorConfig( + unpack={'data_folder': str(common_tmpdir)}, + expert_settings={'unpack_threshold': 0.8} + ) return config @@ -32,7 +31,7 @@ def test_unpack_status_packed_file(config_fixture): assert result['summary'] == ['packed'], '7z file should be packed' result = dict() - config_fixture.set('ExpertSettings', 'compressed_file_types', 'application/x-7z-compressed, ') + config_fixture.expert_settings.compressed_file_types = ['application/x-7z-compressed'] get_unpack_status(test_packed_file_path, test_packed_file_path.read_bytes(), list(), result, config_fixture) assert result['summary'] == ['unpacked'], 'Unpacking Whitelist does not work' diff --git a/fact_extractor/test/unit/unpacker/test_unpacker.py b/fact_extractor/test/unit/unpacker/test_unpacker.py index 9c9506ed..1b3e51e2 100644 --- a/fact_extractor/test/unit/unpacker/test_unpacker.py +++ b/fact_extractor/test/unit/unpacker/test_unpacker.py @@ -1,32 +1,32 @@ -# pylint: disable=attribute-defined-outside-init - from __future__ import annotations import gc import json import os import shutil -from configparser import ConfigParser from pathlib import Path from tempfile import TemporaryDirectory from unittest.mock import Mock, patch +from helperFunctions.config import FactExtractorConfig from helperFunctions.file_system import get_test_data_dir from unpacker.unpack import Unpacker class TestUnpackerBase: def setup_method(self): - self.config = ConfigParser() - self.ds_tmp_dir = TemporaryDirectory(prefix='fact_tests_') self.tmp_dir = TemporaryDirectory(prefix='fact_tests_') - - self.config.add_section('unpack') - self.config.set('unpack', 'data_folder', self.ds_tmp_dir.name) - self.config.set('unpack', 'blacklist', 'text/plain, image/png') - self.config.add_section('ExpertSettings') - self.config.set('ExpertSettings', 'header_overhead', '256') - self.config.set('ExpertSettings', 'unpack_threshold', '0.8') + self.ds_tmp_dir = TemporaryDirectory(prefix='fact_tests_') + self.config = FactExtractorConfig( + unpack={ + 'data_folder': self.ds_tmp_dir.name, + 'blacklist': ['text/plain', 'image/png'], + }, + expert_settings={ + 'header_overhead': 256, + 'unpack_threshold': 0.8, + }, + ) self.unpacker = Unpacker(config=self.config) os.makedirs(str(self.unpacker._report_folder), exist_ok=True) # pylint: disable=protected-access @@ -169,15 +169,15 @@ def test_main_unpack_function(self): def test_main_unpack_exclude_archive(self): test_file_path = Path(get_test_data_dir(), 'container/test.zip') - self.unpacker.exclude = ['*test.zip'] + self.unpacker.config.unpack.exclude = ['*test.zip'] self.main_unpack_check(test_file_path, 0, 1, None) def test_main_unpack_exclude_subdirectory(self): test_file_path = Path(get_test_data_dir(), 'container/test.zip') - self.unpacker.exclude = ['*/generic folder/*'] + self.unpacker.config.unpack.exclude = ['*/generic folder/*'] self.main_unpack_check(test_file_path, 2, 1, '7z') def test_main_unpack_exclude_files(self): test_file_path = Path(get_test_data_dir(), 'container/test.zip') - self.unpacker.exclude = ['*/get_files_test/*test*'] + self.unpacker.config.unpack.exclude = ['*/get_files_test/*test*'] self.main_unpack_check(test_file_path, 0, 3, '7z') diff --git a/fact_extractor/unpacker/unpack.py b/fact_extractor/unpacker/unpack.py index 98fb3901..214ca03e 100644 --- a/fact_extractor/unpacker/unpack.py +++ b/fact_extractor/unpacker/unpack.py @@ -7,6 +7,7 @@ from tempfile import TemporaryDirectory from typing import List, Dict, Tuple +from helperFunctions.config import FactExtractorConfig from helperFunctions.dataConversion import ReportEncoder from helperFunctions.file_system import file_is_empty from helperFunctions.statistics import get_unpack_status, add_unpack_statistics @@ -18,9 +19,9 @@ class Unpacker(UnpackBase): FS_FALLBACK_CANDIDATES = ['SquashFS'] CARVER_FALLBACK_BLACKLIST = ['generic_carver', 'NOP', 'PaTool', 'SFX', 'LinuxKernel'] - def __init__(self, config=None, extract_everything: bool = False, folder: str = None): + def __init__(self, config: FactExtractorConfig = None, extract_everything: bool = False, folder: str = None): super().__init__(config=config, extract_everything=extract_everything) - data_folder = Path(self.config.get('unpack', 'data_folder')) + data_folder = Path(self.config.unpack.data_folder) if folder: self._file_folder = data_folder / folder / 'files' self._report_folder = data_folder / folder / 'reports' @@ -35,7 +36,7 @@ def unpack(self, file_path): 'number_of_unpacked_files': 0, 'number_of_unpacked_directories': 0, 'number_of_excluded_files': 1, - 'info': f'File was ignored because it matched the exclude list {self.exclude}', + 'info': f'File was ignored because it matched the exclude list {self.config.unpack.exclude}', } extracted_files = [] else: @@ -50,8 +51,7 @@ def unpack(self, file_path): extracted_files = self.move_extracted_files(extracted_files, Path(tmp_dir.name)) - compute_stats = self.config.getboolean('ExpertSettings', 'statistics', fallback=True) - if compute_stats: + if self.config.expert_settings.statistics: binary = Path(file_path).read_bytes() add_unpack_statistics(self._file_folder, meta_data) get_unpack_status(file_path, binary, extracted_files, meta_data, self.config) @@ -106,7 +106,12 @@ def move_extracted_files(self, file_paths: List[str], extraction_dir: Path) -> L return extracted_files -def unpack(file_path: str, config, extract_everything: bool = False, folder: str | None = None): +def unpack( + file_path: str, + config: FactExtractorConfig, + extract_everything: bool = False, + folder: str | None = None, +): extracted_objects = Unpacker(config, extract_everything, folder).unpack(file_path) logging.info(f'{len(extracted_objects)} files extracted') path_extracted_files = '\n'.join((str(path) for path in extracted_objects)) diff --git a/fact_extractor/unpacker/unpackBase.py b/fact_extractor/unpacker/unpackBase.py index fb0d629f..ff44a7b9 100644 --- a/fact_extractor/unpacker/unpackBase.py +++ b/fact_extractor/unpacker/unpackBase.py @@ -7,18 +7,17 @@ from common_helper_files import get_files_in_dir from fact_helper_file import get_file_type_from_path -from helperFunctions.config import read_list_from_config +from helperFunctions.config import FactExtractorConfig from helperFunctions.plugin import import_plugins -class UnpackBase(object): +class UnpackBase: ''' The unpacker module unpacks all files included in a file ''' - def __init__(self, config=None, extract_everything: bool = False): + def __init__(self, config: FactExtractorConfig = None, extract_everything: bool = False): self.config = config - self.exclude = read_list_from_config(config, 'unpack', 'exclude') self._setup_plugins() self.extract_everything = extract_everything @@ -35,9 +34,8 @@ def load_plugins(self): plugin.setup(self) def _set_whitelist(self): - self.blacklist = read_list_from_config(self.config, 'unpack', 'blacklist') - logging.debug(f'''Ignore (Blacklist): {', '.join(self.blacklist)}''') - for item in self.blacklist: + logging.debug(f'''Ignore (Blacklist): {', '.join(self.config.unpack.blacklist)}''') + for item in self.config.unpack.blacklist: self.register_plugin(item, self.unpacker_plugins['generic/nop']) def register_plugin(self, mime_type: str, unpacker_name_and_function: Tuple[Callable[[str, str], Dict], str, str]): @@ -62,7 +60,7 @@ def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) def _should_ignore(self, file): path = str(file) - for pattern in self.exclude: + for pattern in self.config.unpack.exclude: if fnmatch.fnmatchcase(path, pattern): return True return False @@ -90,7 +88,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d out = get_files_in_dir(tmp_dir) - if self.exclude: + if self.config.unpack.exclude: # Remove paths that should be ignored excluded_count = len(out) out = [f for f in out if not self._should_ignore(f)] diff --git a/requirements-common.txt b/requirements-common.txt index 2f593086..c6244c2b 100644 --- a/requirements-common.txt +++ b/requirements-common.txt @@ -4,3 +4,4 @@ gunicorn~=21.2.0 pytest<8.1.1 pytest-cov~=5.0.0 testresources~=2.0.1 +toml~=0.10.2