From 2ff843f98307452b891af50ebec2efe62b0f826a Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Tue, 9 Jan 2024 06:47:10 +0100 Subject: [PATCH] Worked on schema extraction script --- esedbrc/catalog_extractor.py | 111 ---------- esedbrc/database.py | 402 ----------------------------------- esedbrc/resources.py | 82 ------- esedbrc/schema_extractor.py | 327 ++++++++++++++++++++++++++++ scripts/extract.py | 362 ++++++++++--------------------- tests/catalog_extractor.py | 76 ------- tests/resources.py | 53 +---- tests/schema_extractor.py | 34 +++ 8 files changed, 487 insertions(+), 960 deletions(-) delete mode 100644 esedbrc/catalog_extractor.py delete mode 100644 esedbrc/database.py create mode 100644 esedbrc/schema_extractor.py delete mode 100644 tests/catalog_extractor.py create mode 100644 tests/schema_extractor.py diff --git a/esedbrc/catalog_extractor.py b/esedbrc/catalog_extractor.py deleted file mode 100644 index ae30fb4..0000000 --- a/esedbrc/catalog_extractor.py +++ /dev/null @@ -1,111 +0,0 @@ -#!/usr/bin/python -# -*- coding: utf-8 -*- -"""ESE database catalog extractor.""" - -import pyesedb - -from esedbrc import resources - - -class EseDbCatalogExtractor(object): - """ESE database catalog extractor.""" - - _TABLES_PER_DATABASE_TYPE = { - 'exchange': frozenset([ - 'Folders', 'Global', 'Mailbox', 'Msg', 'PerUserRead']), - 'search': frozenset([ - 'SystemIndex_0A', 'SystemIndex_Gthr']), - 'security': frozenset([ - 'SmTblSection', 'SmTblVersion']), - 'srum': frozenset([ - 'SruDbIdMapTable', '{D10CA2FE-6FCF-4F6D-848E-B2E99266FA86}', - '{D10CA2FE-6FCF-4F6D-848E-B2E99266FA89}', - '{FEE4E14F-02A9-4550-B5CE-5FA2DA202E37}', - '{973F5D5C-1D90-4944-BE8E-24B94231A174}', - '{FEE4E14F-02A9-4550-B5CE-5FA2DA202E37}LT', - '{DD6636C4-8929-4683-974E-22C046A43763}']), - 'webcache': frozenset([ - 'Containers', 'LeakFiles', 'Partitions']), - 'webcache_ex': frozenset([ - 'Containers', 'LeakFiles', 'PartitionsEx']), - } - - def __init__(self): - """Initializes an ESE database catalog extractor.""" - super(EseDbCatalogExtractor, self).__init__() - self._database_type = None - self._database_version = None - - def _DetermineDatabaseType(self, table_names): - """Determines the database type. - - Args: - table_names (set[str]): table names. - - Returns: - str: database type or None if the database type could not be determined. - """ - for database_type, database_table_names in ( - self._TABLES_PER_DATABASE_TYPE.items()): - if database_table_names.issubset(table_names): - return database_type - - return None - - def ExtractCatalog(self, filename, output_writer): - """Extracts the catalog from the database. - - Args: - filename (str): name of the file containing the ESE database. - output_writer (OutputWriter): output writer. - """ - esedb_file = pyesedb.file() - esedb_file.open(filename) - - # TODO: write an overview of the table names. - # TODO: write the table and index names per type and version. - - table_definitions = [] - for esedb_table in iter(esedb_file.tables): - table_definition = resources.EseTableDefinition( - esedb_table.name, esedb_table.template_name) - - for esedb_column in esedb_table.columns: - table_definition.AddColumnDefinition( - esedb_column.identifier, esedb_column.name, esedb_column.type) - - table_definitions.append(table_definition) - - unique_table_definitions = [] - for table_definition in table_definitions: - table_columns = [ - definition.CopyToDict() - for definition in table_definition.column_definitions] - - is_unique_table = True - for compare_table_definition in unique_table_definitions: - compare_table_columns = [ - definition.CopyToDict() - for definition in compare_table_definition.column_definitions] - - if table_columns == compare_table_columns: - compare_table_definition.aliases.append(table_definition.name) - is_unique_table = False - - if is_unique_table: - unique_table_definitions.append(table_definition) - - table_names = frozenset([ - table_definition.GetCommonName() - for table_definition in unique_table_definitions]) - - database_type = self._DetermineDatabaseType(table_names) - - ese_database_definition = resources.EseDatabaseDefinition( - database_type, 'unknown') - - output_writer.WriteDatabaseDefinition(ese_database_definition) - - output_writer.WriteTableDefinitions(unique_table_definitions) - - esedb_file.close() diff --git a/esedbrc/database.py b/esedbrc/database.py deleted file mode 100644 index 66660bb..0000000 --- a/esedbrc/database.py +++ /dev/null @@ -1,402 +0,0 @@ -# -*- coding: utf-8 -*- -"""Classes to read from and write to SQLite databases.""" - -import re - -import sqlite3 - - -class Sqlite3DatabaseFile(object): - """SQLite3 database file.""" - - _HAS_TABLE_QUERY = ( - 'SELECT name FROM sqlite_master ' - 'WHERE type = "table" AND name = "{0:s}"') - - def __init__(self): - """Initializes a database file.""" - super(Sqlite3DatabaseFile, self).__init__() - self._connection = None - self._cursor = None - self.filename = None - self.read_only = None - - def Close(self): - """Closes the database file. - - Raises: - RuntimeError: if the database is not opened. - """ - if not self._connection: - raise RuntimeError('Cannot close database not opened.') - - # We need to run commit or not all data is stored in the database. - self._connection.commit() - self._connection.close() - - self._connection = None - self._cursor = None - self.filename = None - self.read_only = None - - def CreateTable(self, table_name, column_definitions): - """Creates a table. - - Args: - table_name (str): table name. - column_definitions (list[str]): column definitions. - - Raises: - RuntimeError: if the database is not opened or - if the database is in read-only mode. - """ - if not self._connection: - raise RuntimeError('Cannot create table database not opened.') - - if self.read_only: - raise RuntimeError('Cannot create table database in read-only mode.') - - column_definitions = ', '.join(column_definitions) - self._cursor.execute( - f'CREATE TABLE {table_name:s} ( {column_definitions:s} )') - - def GetValues(self, table_names, column_names, condition): - """Retrieves values from a table. - - Args: - table_names (list[str]): table names. - column_names (list[str]): column names. - condition (str): condition. - - Yields: - sqlite3.row: a row. - - Raises: - RuntimeError: if the database is not opened. - """ - if not self._connection: - raise RuntimeError('Cannot retrieve values database not opened.') - - table_names = ', '.join(table_names) - column_names_string = ', '.join(column_names) - - sql_query = f'SELECT {column_names_string:s} FROM {table_names:s}' - if condition: - sql_query = ''.join([sql_query, f' WHERE {condition:s}']) - - self._cursor.execute(sql_query) - - for row in self._cursor: - values = {} - for column_index, column_name in enumerate(column_names): - values[column_name] = row[column_index] - yield values - - def HasTable(self, table_name): - """Determines if a specific table exists. - - Args: - table_name (str): table name. - - Returns: - bool: True if the table exists, False otheriwse. - - Raises: - RuntimeError: if the database is not opened. - """ - if not self._connection: - raise RuntimeError( - 'Cannot determine if table exists database not opened.') - - sql_query = self._HAS_TABLE_QUERY.format(table_name) - - self._cursor.execute(sql_query) - - return bool(self._cursor.fetchone()) - - def InsertValues(self, table_name, column_names, values): - """Inserts values into a table. - - Args: - table_name (str): table name. - column_names (list[str]): column names. - values (list[str]): values formatted as a string. - - Raises: - RuntimeError: if the database is not opened or - if the database is in read-only mode or - if an unsupported value type is encountered. - """ - if not self._connection: - raise RuntimeError('Cannot insert values database not opened.') - - if self.read_only: - raise RuntimeError('Cannot insert values database in read-only mode.') - - if not values: - return - - sql_values = [] - for value in values: - if isinstance(value, str): - # In sqlite3 the double quote is escaped with a second double quote. - value = re.sub('"', '""', value) - value = f'"{value:s}"' - elif isinstance(value, int): - value = f'{value:d}' - elif isinstance(value, float): - value = f'{value:f}' - elif value is None: - value = 'NULL' - else: - value_type = type(value) - raise RuntimeError(f'Unsupported value type: {value_type!s}.') - - sql_values.append(value) - - column_names = ', '.join(column_names) - sql_values = ', '.join(sql_values) - - self._cursor.execute( - f'INSERT INTO {table_name:s} ( {column_names:s} ) ' - f'VALUES ( {sql_values:s} )') - - def Open(self, filename, read_only=False): - """Opens the database file. - - Args: - filename (str): filename of the database. - read_only (Optional[bool]): True if the database should be opened in - read-only mode. Since sqlite3 does not support a real read-only - mode we fake it by only permitting SELECT queries. - - Returns: - bool: True if successful or False if not. - - Raises: - RuntimeError: if the database is already opened. - """ - if self._connection: - raise RuntimeError('Cannot open database already opened.') - - self.filename = filename - self.read_only = read_only - - self._connection = sqlite3.connect(filename) - if not self._connection: - return False - - self._cursor = self._connection.cursor() - if not self._cursor: - return False - - return True - - -class Sqlite3DatabaseReader(object): - """SQLite3 database reader.""" - - def __init__(self): - """Initializes a database reader.""" - super(Sqlite3DatabaseReader, self).__init__() - self._database_file = Sqlite3DatabaseFile() - - def Close(self): - """Closes the database reader object.""" - self._database_file.Close() - - def Open(self, filename): - """Opens the database reader object. - - Args: - filename (str): filename of the database. - - Returns: - bool: True if successful or False if not. - """ - return self._database_file.Open(filename, read_only=True) - - -class Sqlite3DatabaseWriter(object): - """SQLite3 database writer.""" - - def __init__(self): - """Initializes a database writer.""" - super(Sqlite3DatabaseWriter, self).__init__() - self._database_file = Sqlite3DatabaseFile() - - def Close(self): - """Closes the database writer object.""" - self._database_file.Close() - - def Open(self, filename): - """Opens the database writer object. - - Args: - filename (str): filename of the database. - - Returns: - bool: True if successful or False if not. - """ - self._database_file.Open(filename) - return True - - -class EseDbCatalogSqlite3DatabaseWriter(Sqlite3DatabaseWriter): - """ESE database catolog SQLite3 writer.""" - - def _GetDatabaseDefinitionKey(self, ese_database_definition): - """Retrieves the key of a database definition. - - Args: - ese_database_definition (EseDatabaseDefinition): database definition. - - Returns: - int: database definition key or None if no such value. - - Raises: - RuntimeError: if more than one value is found in the database. - """ - table_names = ['database_definitions'] - column_names = ['database_definition_key'] - condition = ( - f'type = "{ese_database_definition.type:s}" AND ' - f'version = "{ese_database_definition.version:s}"') - values_list = list(self._database_file.GetValues( - table_names, column_names, condition)) - - number_of_values = len(values_list) - if number_of_values == 0: - return None - - if number_of_values == 1: - values = values_list[0] - return values['database_definition_key'] - - raise RuntimeError('More than one value found in database.') - - def GetTableDefinitionKey(self, ese_table_definition): - """Retrieves the key of a database definition. - - Args: - ese_table_definition (EseTableDefinition): database definition. - - Returns: - int: database definition key or None if no such value. - - Raises: - RuntimeError: if more than one value is found in the database. - """ - table_names = ['table_definitions'] - column_names = ['table_definition_key'] - condition = f'name = "{ese_table_definition.name:s}"' - values_list = list(self._database_file.GetValues( - table_names, column_names, condition)) - - number_of_values = len(values_list) - if number_of_values == 0: - return None - - if number_of_values == 1: - values = values_list[0] - return values['table_definition_key'] - - raise RuntimeError('More than one value found in database.') - - def WriteColumnDefinition(self, table_definition_key, ese_column_definition): - """Writes the column definition. - - Args: - table_definition_key (int): table definition key. - ese_column_definition (EseColumnDefinition): column definition. - """ - table_name = 'column_definitions' - column_names = ['identifier', 'name', 'type', 'table_definition_key'] - - has_table = self._database_file.HasTable(table_name) - if not has_table: - column_definitions = [ - 'column_definition_key INTEGER PRIMARY KEY AUTOINCREMENT', - 'identifier TEXT', 'name TEXT', 'type TEXT', - 'table_definition_key INTEGER'] - self._database_file.CreateTable(table_name, column_definitions) - insert_values = True - - else: - condition = ( - f'name = "{ese_column_definition.name:s}" AND ' - f'table_definition_key = {table_definition_key:d}') - values_list = list(self._database_file.GetValues( - [table_name], column_names, condition)) - - number_of_values = len(values_list) - # TODO: check if more than 1 result. - insert_values = number_of_values == 0 - - if insert_values: - values = [ - ese_column_definition.identifier, ese_column_definition.name, - ese_column_definition.type, table_definition_key] - self._database_file.InsertValues(table_name, column_names, values) - - def WriteDatabaseDefinition(self, ese_database_definition): - """Writes the database definition. - - Args: - ese_database_definition (EseDatabaseDefinition): database definition. - """ - table_name = 'database_definitions' - column_names = ['type', 'version'] - - has_table = self._database_file.HasTable(table_name) - if not has_table: - column_definitions = [ - 'database_definition_key INTEGER PRIMARY KEY AUTOINCREMENT', - 'type TEXT', 'version TEXT'] - self._database_file.CreateTable(table_name, column_definitions) - insert_values = True - - else: - condition = ( - f'type = "{ese_database_definition.type:s}" AND ' - f'version = "{ese_database_definition.version:s}"') - values_list = list(self._database_file.GetValues( - [table_name], column_names, condition)) - - number_of_values = len(values_list) - # TODO: check if more than 1 result. - insert_values = number_of_values == 0 - - if insert_values: - values = [ese_database_definition.type, ese_database_definition.version] - self._database_file.InsertValues(table_name, column_names, values) - - def WriteTableDefinition(self, ese_table_definition): - """Writes the table definition. - - Args: - ese_table_definition (EseTableDefinition): table definition. - """ - table_name = 'table_definitions' - column_names = ['name'] - - has_table = self._database_file.HasTable(table_name) - if not has_table: - column_definitions = [ - 'table_definition_key INTEGER PRIMARY KEY AUTOINCREMENT', - 'name TEXT'] - self._database_file.CreateTable(table_name, column_definitions) - insert_values = True - - else: - condition = f'name = "{ese_table_definition.name:s}"' - values_list = list(self._database_file.GetValues( - [table_name], column_names, condition)) - - number_of_values = len(values_list) - # TODO: check if more than 1 result. - insert_values = number_of_values == 0 - - if insert_values: - values = [ese_table_definition.name] - self._database_file.InsertValues(table_name, column_names, values) diff --git a/esedbrc/resources.py b/esedbrc/resources.py index e0d295a..416c32e 100644 --- a/esedbrc/resources.py +++ b/esedbrc/resources.py @@ -55,26 +55,6 @@ def CopyToDict(self): 'type': self.type} -class EseDatabaseDefinition(object): - """ESE database definition. - - Attributes: - type (str): ESE database type. - version (str): ESE database version. - """ - - def __init__(self, database_type, database_version): - """Initializes an ESE database database definition. - - Args: - database_type (str): ESE database type. - database_version (str): ESE database version. - """ - super(EseDatabaseDefinition, self).__init__() - self.type = database_type - self.version = database_version - - class EseTableDefinition(object): """ESE database table definition. @@ -137,65 +117,3 @@ def GetCommonName(self): self._common_name = f'{self._common_name:s}#' return self._common_name - - -class ColumnOverlay(object): - """Column overlay. - - Attributes: - identifier (str): column identifier. - name (str): column name. - type (str): column type. - """ - - def __init__(self, column_identifier, column_name, column_type): - """Initializes a column overlay. - - Args: - column_identifier (str): column identifier. - column_name (str): column name. - column_type (str): column type. - """ - super(ColumnOverlay, self).__init__() - self.column_identifier = column_identifier - self.column_name = column_name - self.column_type = column_type - - @property - def comparable(self): - """str: comparable identifier.""" - return (f'identifier: {self.column_identifier:s}, name: ' - f'{self.column_name:s}, type: {self.column_type:s}') - - -class TableOverlay(object): - """Table overlay. - - Attributes: - name (str): table name. - """ - - def __init__(self, table_name): - """Initializes a table overlay. - - Args: - table_name (str): table name. - """ - super(TableOverlay, self).__init__() - self._column_overlays = {} - self.table_name = table_name - - def AddColumnOverlay(self, column_overlay): - """Adds a column overlay. - - Args: - column_overlay (ColumnOverlay): column overlay. - - Raises: - KeyError: if the column overlay is already set. - """ - if column_overlay.comparable in self._column_overlays: - raise KeyError( - f'Column overlay: {column_overlay.comparable:s} already set.') - - self._column_overlays[column_overlay.comparable] = column_overlay diff --git a/esedbrc/schema_extractor.py b/esedbrc/schema_extractor.py new file mode 100644 index 0000000..3991734 --- /dev/null +++ b/esedbrc/schema_extractor.py @@ -0,0 +1,327 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""ESE database schema extractor.""" + +import logging +import os + +import pyesedb + +from artifacts import definitions as artifacts_definitions +from artifacts import reader as artifacts_reader +from artifacts import registry as artifacts_registry + +from dfimagetools import definitions as dfimagetools_definitions +from dfimagetools import file_entry_lister + +from esedbrc import resources +from esedbrc import yaml_definitions_file + + +class EseDbSchemaExtractor(object): + """ESE database schema extractor.""" + + _DATABASE_DEFINITIONS_FILE = ( + os.path.join(os.path.dirname(__file__), 'data', 'known_databases.yaml')) + + _MINIMUM_FILE_SIZE = 16 + + def __init__(self, artifact_definitions, mediator=None): + """Initializes a ESE database file schema extractor. + + Args: + artifact_definitions (str): path to a single artifact definitions + YAML file or a directory of definitions YAML files. + mediator (Optional[dfvfs.VolumeScannerMediator]): a volume scanner + mediator. + """ + super(EseDbSchemaExtractor, self).__init__() + self._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry() + self._known_database_definitions = {} + self._mediator = mediator + + if artifact_definitions: + reader = artifacts_reader.YamlArtifactsReader() + if os.path.isdir(artifact_definitions): + self._artifacts_registry.ReadFromDirectory(reader, artifact_definitions) + elif os.path.isfile(artifact_definitions): + self._artifacts_registry.ReadFromFile(reader, artifact_definitions) + + definitions_file = yaml_definitions_file.YAMLDatabaseDefinitionsFile() + for database_definition in definitions_file.ReadFromFile( + self._DATABASE_DEFINITIONS_FILE): + artifact_definition = self._artifacts_registry.GetDefinitionByName( + database_definition.artifact_definition) + if not artifact_definition: + logging.warning((f'Unknown artifact definition: ' + f'{database_definition.artifact_definition:s}')) + else: + self._known_database_definitions[ + database_definition.database_identifier] = artifact_definition + + def _CheckSignature(self, file_object): + """Checks the signature of a given database file-like object. + + Args: + file_object (dfvfs.FileIO): file-like object of the database. + + Returns: + bool: True if the signature matches that of a ESE database, False + otherwise. + """ + if not file_object: + return False + + file_object.seek(4, os.SEEK_SET) + file_data = file_object.read(4) + return file_data == b'\xef\xcd\xab\x89' + + def _FormatSchemaAsYAML(self, schema): + """Formats a schema into YAML. + + Args: + schema (list[EseTableDefinition]): schema as unique table definitions or + None if the schema could not be retrieved. + + Returns: + str: schema formatted as YAML. + + Raises: + RuntimeError: if a query could not be parsed. + """ + lines = ['# esedb-kb database schema.'] + + for table_definition in sorted( + schema, key=lambda table_definition: table_definition.name): + lines.extend([ + '---', + f'table: {table_definition.name:s}', + 'columns:']) + + for column_definition in table_definition.column_definitions: + # TODO: convert type to human readable string. + lines.extend([ + f'- name: {column_definition.name:s}', + f' value_type: {column_definition.type:d}']) + + lines.append('') + return '\n'.join(lines) + + def _GetDatabaseIdentifier(self, path_segments): + """Determines the database identifier. + + Args: + path_segments (list[str]): path segments. + + Returns: + str: database identifier or None if the type could not be determined. + """ + # TODO: make comparison more efficient. + for database_identifier, artifact_definition in ( + self._known_database_definitions.items()): + for source in artifact_definition.sources: + if source.type_indicator in ( + artifacts_definitions.TYPE_INDICATOR_DIRECTORY, + artifacts_definitions.TYPE_INDICATOR_FILE, + artifacts_definitions.TYPE_INDICATOR_PATH): + for source_path in set(source.paths): + source_path_segments = source_path.split(source.separator) + + if not source_path_segments[0]: + source_path_segments = source_path_segments[1:] + + # TODO: add support for parameters. + last_index = len(source_path_segments) + for index in range(1, last_index + 1): + source_path_segment = source_path_segments[-index] + if not source_path_segment or len(source_path_segment) < 2: + continue + + if (source_path_segment[0] == '%' and + source_path_segment[-1] == '%'): + source_path_segments = source_path_segments[-index + 1:] + break + + if len(source_path_segments) > len(path_segments): + continue + + is_match = True + last_index = min(len(source_path_segments), len(path_segments)) + for index in range(1, last_index + 1): + source_path_segment = source_path_segments[-index] + # TODO: improve handling of * + if '*' in source_path_segment: + continue + + path_segment = path_segments[-index].lower() + source_path_segment = source_path_segment.lower() + + is_match = path_segment == source_path_segment + if not is_match: + break + + if is_match: + return database_identifier + + return None + + def _GetDatabaseSchema(self, database_path): + """Retrieves schema from given database. + + Args: + database_path (str): file path to database. + + Returns: + list[EseTableDefinition]: schema as unique table definitions or None if + the schema could not be retrieved. + """ + with open(database_path, 'rb') as file_object: + return self._GetDatabaseSchemaFromFileObject(file_object) + + def _GetDatabaseSchemaFromFileObject(self, file_object): + """Retrieves schema from given database file-like object. + + Args: + file_object (dfvfs.FileIO): file-like object of the database. + + Returns: + list[EseTableDefinition]: schema as unique table definitions or None if + the schema could not be retrieved. + """ + esedb_file = pyesedb.file() + esedb_file.open_file_object(file_object) + + try: + table_definitions = [] + for esedb_table in iter(esedb_file.tables): + table_definition = resources.EseTableDefinition( + esedb_table.name, esedb_table.template_name) + + for esedb_column in esedb_table.columns: + table_definition.AddColumnDefinition( + esedb_column.identifier, esedb_column.name, esedb_column.type) + + table_definitions.append(table_definition) + + unique_table_definitions = [] + for table_definition in table_definitions: + table_columns = [ + definition.CopyToDict() + for definition in table_definition.column_definitions] + + is_unique_table = True + for compare_table_definition in unique_table_definitions: + compare_table_columns = [ + definition.CopyToDict() + for definition in compare_table_definition.column_definitions] + + if table_columns == compare_table_columns: + compare_table_definition.aliases.append(table_definition.name) + is_unique_table = False + + if is_unique_table: + unique_table_definitions.append(table_definition) + + finally: + esedb_file.close() + + # TODO: move schema into object. + return unique_table_definitions + + def GetDisplayPath(self, path_segments, data_stream_name=None): + """Retrieves a path to display. + + Args: + path_segments (list[str]): path segments of the full path of the file + entry. + data_stream_name (Optional[str]): name of the data stream. + + Returns: + str: path to display. + """ + display_path = '' + + path_segments = [ + segment.translate( + dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) + for segment in path_segments] + display_path = ''.join([display_path, '/'.join(path_segments)]) + + if data_stream_name: + data_stream_name = data_stream_name.translate( + dfimagetools_definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) + display_path = ':'.join([display_path, data_stream_name]) + + return display_path or '/' + + def ExtractSchemas(self, path, options=None): + """Extracts database schemas from the path. + + Args: + path (str): path of a ESE database file or storage media image containing + ESE database files. + options (Optional[dfvfs.VolumeScannerOptions]): volume scanner options. If + None the default volume scanner options are used, which are defined in + the dfVFS VolumeScannerOptions class. + + Yields: + tuple[str, dict[str, str]]: known database type identifier or the name of + the ESE database file if not known and schema. + """ + entry_lister = file_entry_lister.FileEntryLister(mediator=self._mediator) + + base_path_specs = entry_lister.GetBasePathSpecs(path, options=options) + if not base_path_specs: + logging.warning( + f'Unable to determine base path specifications from: {path:s}') + + else: + for file_entry, path_segments in entry_lister.ListFileEntries( + base_path_specs): + if not file_entry.IsFile() or file_entry.size < self._MINIMUM_FILE_SIZE: + continue + + file_object = file_entry.GetFileObject() + if not self._CheckSignature(file_object): + continue + + display_path = self.GetDisplayPath(path_segments) + # logging.info( + # f'Extracting schema from database file: {display_path:s}') + + database_schema = self._GetDatabaseSchemaFromFileObject(file_object) + if database_schema is None: + logging.warning(( + f'Unable to determine schema from database file: ' + f'{display_path:s}')) + continue + + # TODO: improve support to determine identifier for single database + # file. + database_identifier = self._GetDatabaseIdentifier(path_segments) + if not database_identifier: + logging.warning(( + f'Unable to determine known database identifier of file: ' + f'{display_path:s}')) + + database_identifier = path_segments[-1] + + yield database_identifier, database_schema + + def FormatSchema(self, schema, output_format): + """Formats a schema into a word-wrapped string. + + Args: + schema (dict[str, str]): schema as an SQL query per table name. + output_format (str): output format. + + Returns: + str: formatted schema. + + Raises: + RuntimeError: if a query could not be parsed. + """ + if output_format == 'yaml': + return self._FormatSchemaAsYAML(schema) + + raise RuntimeError(f'Unsupported output format: {output_format:s}') diff --git a/scripts/extract.py b/scripts/extract.py index a8f0eb5..5182655 100755 --- a/scripts/extract.py +++ b/scripts/extract.py @@ -7,196 +7,13 @@ import os import sys -from esedbrc import catalog_extractor -from esedbrc import definitions -from esedbrc import database +from dfvfs.helpers import command_line as dfvfs_command_line +from dfvfs.helpers import volume_scanner as dfvfs_volume_scanner +from dfvfs.lib import errors as dfvfs_errors +from dfimagetools import helpers -class Sqlite3OutputWriter(object): - """SQLite3 output writer.""" - - def __init__(self, databases_path): - """Initializes an output writer. - - Args: - databases_path (str): path to the database files. - """ - super(Sqlite3OutputWriter, self).__init__() - self._databases_path = databases_path - self._database_writer = None - - def _WriteTableDefinition(self, table_definition): - """Writes the table definition. - - Args: - table_definition (EseTableDefinition): table definition. - """ - # TODO: detect tables with duplicate names and different definitions. - self._database_writer.WriteTableDefinition(table_definition) - - table_definition_key = self._database_writer.GetTableDefinitionKey( - table_definition) - - for column_definition in table_definition.column_definitions: - self._database_writer.WriteColumnDefinition( - table_definition_key, column_definition) - - def Close(self): - """Closes the output writer object.""" - self._database_writer.Close() - self._database_writer = None - - def Open(self, database_type): - """Opens the output writer object. - - Args: - database_type (str): ESE database type. - - Returns: - bool: True if successful or False if not. - """ - if not os.path.isdir(self._databases_path): - return False - - self._database_writer = database.EseDbCatalogSqlite3DatabaseWriter() - self._database_writer.Open(os.path.join( - self._databases_path, f'{database_type:s}.db')) - - return True - - def WriteDatabaseDefinition(self, database_definition): - """Writes the database definition. - - Args: - database_definition (EseDatabaseDefinition): database definition. - """ - self._database_writer.WriteDatabaseDefinition(database_definition) - - def WriteTableDefinitions(self, table_definitions): - """Writes the table definitions. - - Args: - table_definitions (list[EseTableDefinition]): table definitions. - """ - for table_definition in table_definitions: - self._WriteTableDefinition(table_definition) - - -class StdoutWriter(object): - """Stdout output writer.""" - - def __init__(self): - """Initializes an output writer.""" - super(StdoutWriter, self).__init__() - self._database_type = None - - def _GetTableLinkName(self, common_table_name): - """Retrieves the table link name. - - Args: - common_table_name (str): common table name. - - Returns: - str: table link name. - """ - link_name = '_'.join(['table', common_table_name.lower()]) - if link_name.endswith('_#'): - link_name = link_name[:-2] - - return link_name - - def _WriteColumnDefinition(self, column_definition): - """Writes the column definition. - - Args: - column_definition (EseColumnDefinition): column definition. - """ - column_type = definitions.COLUMN_TYPE_DESCRIPTIONS.get( - column_definition.type, 'UNKNOWN') - print((f'| {column_definition.identifier:d} | {column_definition.name:s} | ' - f'{column_type:s}')) - - def _WriteTableDefinition(self, table_definition): - """Writes the table definition. - - Args: - table_definition (EseTableDefinition): table definition. - """ - self._WriteTableHeader(table_definition) - - for column_definition in table_definition.column_definitions: - self._WriteColumnDefinition(column_definition) - - self._WriteTableFooter() - - def _WriteTableFooter(self): - """Writes the table footer.""" - print('|===') - print('') - - def _WriteTableHeader(self, table_definition): - """Writes the table header. - - Args: - table_definition (EseTableDefinition): table definition. - """ - common_table_name = table_definition.GetCommonName() - link_name = self._GetTableLinkName(common_table_name) - - print(f'=== [[{link_name:s}]]{common_table_name:s}') - - if table_definition.template_table_name: - print(f'Template table: {table_definition.template_table_name:s}') - - print('') - print('[cols="1,3,5",options="header"]') - print('|===') - print('| Column indentifier | Column name | Column type') - - def Close(self): - """Closes the output writer object.""" - return - - def Open(self, database_type): # pylint: disable=unused-argument - """Opens the output writer object. - - Args: - database_type (str): ESE database type. - - Returns: - bool: True if successful or False if not. - """ - self._database_type = database_type - return True - - def WriteDatabaseDefinition(self, database_definition): - """Writes the database definition. - - Args: - database_definition (EseDatabaseDefinition): database definition. - """ - print(f'== {self._database_type:s} {database_definition.version:s}') - print('') - - def WriteTableDefinitions(self, table_definitions): - """Writes the table definitions. - - Args: - table_definitions (list[EseTableDefinition]): table definitions. - """ - print('=== Tables') - print('') - - for table_definition in table_definitions: - common_table_name = table_definition.GetCommonName() - link_name = self._GetTableLinkName(common_table_name) - - print(f'* <<{link_name:s},{common_table_name:s}>>') - - print('') - - for table_definition in table_definitions: - self._WriteTableDefinition(table_definition) +from esedbrc import schema_extractor def Main(): @@ -205,86 +22,141 @@ def Main(): Returns: bool: True if successful or False if not. """ - args_parser = argparse.ArgumentParser(description=( + argument_parser = argparse.ArgumentParser(description=( 'Extract the catalog from the ESE database file.')) - args_parser.add_argument( - 'source', action='store', nargs='?', default=None, - help='path of the ESE database file.', metavar='/mnt/c/') - - args_parser.add_argument( - 'database_type', action='store', nargs='?', default=None, - help='string that identifies the ESE database type.', - metavar='search') - - args_parser.add_argument( - 'database_version', action='store', nargs='?', default=None, - help='string that identifies the ESE database version.', - metavar='XP') - - args_parser.add_argument( - '--db', '--database', action='store', default=None, - help='directory to write the sqlite3 databases to.', - metavar='./esedb-kb/', dest='database') - - options = args_parser.parse_args() + # TODO: add data group. + argument_parser.add_argument( + '--artifact_definitions', '--artifact-definitions', + dest='artifact_definitions', type=str, metavar='PATH', action='store', + help=('Path to a directory or file containing the artifact definition ' + '.yaml files.')) + + argument_parser.add_argument( + '--output', dest='output', action='store', metavar='./sqlite-kb/', + default=None, help='Directory to write the output to.') + + # TODO: add source group. + argument_parser.add_argument( + '--back_end', '--back-end', dest='back_end', action='store', + metavar='NTFS', default=None, help='preferred dfVFS back-end.') + + argument_parser.add_argument( + '--partitions', '--partition', dest='partitions', action='store', + type=str, default=None, help=( + 'Define partitions to be processed. A range of partitions can be ' + 'defined as: "3..5". Multiple partitions can be defined as: "1,3,5" ' + '(a list of comma separated values). Ranges and lists can also be ' + 'combined as: "1,3..5". The first partition is 1. All partitions ' + 'can be specified with: "all".')) + + argument_parser.add_argument( + '--snapshots', '--snapshot', dest='snapshots', action='store', type=str, + default=None, help=( + 'Define snapshots to be processed. A range of snapshots can be ' + 'defined as: "3..5". Multiple snapshots can be defined as: "1,3,5" ' + '(a list of comma separated values). Ranges and lists can also be ' + 'combined as: "1,3..5". The first snapshot is 1. All snapshots can ' + 'be specified with: "all".')) + + argument_parser.add_argument( + '--volumes', '--volume', dest='volumes', action='store', type=str, + default=None, help=( + 'Define volumes to be processed. A range of volumes can be defined ' + 'as: "3..5". Multiple volumes can be defined as: "1,3,5" (a list ' + 'of comma separated values). Ranges and lists can also be combined ' + 'as: "1,3..5". The first volume is 1. All volumes can be specified ' + 'with: "all".')) + + argument_parser.add_argument( + 'source', nargs='?', action='store', metavar='image.raw', default=None, + help='path of a storage media image or ESE database file.') + + options = argument_parser.parse_args() if not options.source: print('Source value is missing.') print('') - args_parser.print_help() + argument_parser.print_help() print('') return False - if not os.path.exists(options.source): - print(f'No such source: {options.source:s}.') + if not options.artifact_definitions: + print('Path to artifact definitions is missing.') print('') - return False - - if not options.database_type: - print('Database type value is missing.') + argument_parser.print_help() print('') return False - if not options.database_version: - print('Database version value is missing.') - print('') - return False + if options.output: + if not os.path.exists(options.output): + os.mkdir(options.output) + + if not os.path.isdir(options.output): + print(f'{options.output:s} must be a directory') + print('') + return False + + helpers.SetDFVFSBackEnd(options.back_end) logging.basicConfig( level=logging.INFO, format='[%(levelname)s] %(message)s') - if options.database: - if not os.path.exists(options.database): - os.mkdir(options.database) + mediator = dfvfs_command_line.CLIVolumeScannerMediator() - if not os.path.isdir(options.database): - print(f'{options.database:s} must be a directory') - print('') - return False + volume_scanner_options = dfvfs_volume_scanner.VolumeScannerOptions() + volume_scanner_options.partitions = mediator.ParseVolumeIdentifiersString( + options.partitions) - output_writer = Sqlite3OutputWriter(options.database) + if options.snapshots == 'none': + volume_scanner_options.snapshots = ['none'] else: - output_writer = StdoutWriter() - - if not output_writer.Open(options.database_type): - print('Unable to open output writer.') + volume_scanner_options.snapshots = mediator.ParseVolumeIdentifiersString( + options.snapshots) + + volume_scanner_options.volumes = mediator.ParseVolumeIdentifiersString( + options.volumes) + + extractor = schema_extractor.EseDbSchemaExtractor( + options.artifact_definitions, mediator=mediator) + + try: + for database_identifier, database_schema in extractor.ExtractSchemas( + options.source, options=volume_scanner_options): + if not database_schema: + continue + + output_text = extractor.FormatSchema(database_schema, 'yaml') + if not options.output: + print(output_text) + else: + file_exists = False + output_file = None + for number in range(1, 99): + filename = f'{database_identifier:s}.{number:d}.yaml' + output_file = os.path.join(options.output, filename) + if not os.path.exists(output_file): + break + + with open(output_file, 'r', encoding='utf-8') as existing_file_object: + existing_output_text = existing_file_object.read() + if output_text == existing_output_text: + file_exists = True + break + + if not file_exists: + with open(output_file, 'w', encoding='utf-8') as output_file_object: + output_file_object.write(output_text) + + except dfvfs_errors.ScannerError as exception: + print(f'[ERROR] {exception!s}', file=sys.stderr) print('') return False - # TODO: do something with options.database_type, options.database_version - # or remove. - - extractor = catalog_extractor.EseDbCatalogExtractor() - - # TODO: read table and index overlays from file. - # maybe something for an export script. - # overlays = {} - - # TODO: add support to read multiple files from a directory. - - extractor.ExtractCatalog(options.source, output_writer) - output_writer.Close() + except KeyboardInterrupt: + print('Aborted by user.', file=sys.stderr) + print('') + return False return True diff --git a/tests/catalog_extractor.py b/tests/catalog_extractor.py deleted file mode 100644 index 033aae6..0000000 --- a/tests/catalog_extractor.py +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -"""Tests for the ESE database catalog extractor.""" - -import unittest - -from esedbrc import catalog_extractor - -from tests import test_lib - - -class TestOutputWriter(object): - """Test output writer.""" - - def Close(self): - """Closes the output writer object.""" - return - - def Open(self, database_type): # pylint: disable=unused-argument - """Opens the output writer object. - - Args: - database_type (str): ESE database type. - - Returns: - bool: True if successful or False if not. - """ - return True - - def WriteDatabaseDefinition(self, database_definition): # pylint: disable=unused-argument - """Writes the database definition. - - Args: - database_definition (EseDatabaseDefinition): database definition. - """ - return - - def WriteTableDefinitions(self, table_definitions): # pylint: disable=unused-argument - """Writes the table definitions. - - Args: - table_definitions (list[EseTableDefinition]): table definitions. - """ - return - - -class EseDbCatalogExtractorTest(test_lib.BaseTestCase): - """Tests for the ESE database catalog extractor.""" - - # pylint: disable=protected-access - - def testInitialize(self): - """Tests the __init__ function.""" - test_extractor = catalog_extractor.EseDbCatalogExtractor() - self.assertIsNotNone(test_extractor) - - def testDetermineDatabaseType(self): - """Tests the _DetermineDatabaseType function.""" - test_extractor = catalog_extractor.EseDbCatalogExtractor() - - database_type = test_extractor._DetermineDatabaseType([ - 'SystemIndex_0A', 'SystemIndex_Gthr']) - self.assertEqual(database_type, 'search') - - def testExtractCatalog(self): - """Tests the ExtractCatalog function.""" - test_file_path = self._GetTestFilePath(['WebCacheV01.dat']) - self._SkipIfPathNotExists(test_file_path) - - test_extractor = catalog_extractor.EseDbCatalogExtractor() - test_output_writer = TestOutputWriter() - test_extractor.ExtractCatalog(test_file_path, test_output_writer) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/resources.py b/tests/resources.py index d492c87..999d16a 100644 --- a/tests/resources.py +++ b/tests/resources.py @@ -9,6 +9,15 @@ from tests import test_lib +class DatabaseDefinitionTest(test_lib.BaseTestCase): + """Tests for the database definition.""" + + def testInitialize(self): + """Tests the __init__ function.""" + database_definition = resources.DatabaseDefinition() + self.assertIsNotNone(database_definition) + + class EseColumnDefinitionTest(test_lib.BaseTestCase): """Tests for the ESE database column definition.""" @@ -30,15 +39,6 @@ def testCopyToDict(self): self.assertEqual(column_definition.CopyToDict(), expected_dict) -class EseDatabaseDefinitionTest(test_lib.BaseTestCase): - """Tests for the ESE database definition.""" - - def testInitialize(self): - """Tests the __init__ function.""" - database_definition = resources.EseDatabaseDefinition('type', 'version') - self.assertIsNotNone(database_definition) - - class EseTableDefinitionTest(test_lib.BaseTestCase): """Tests for the ESE table definition.""" @@ -53,40 +53,5 @@ def testAddColumnDefinition(self): table_definition.AddColumnDefinition('identifier', 'name', 'type') -class ColumnOverlayTest(test_lib.BaseTestCase): - """Tests for the column overlay.""" - - def testInitialize(self): - """Tests the __init__ function.""" - column_overlay = resources.ColumnOverlay('identifier', 'name', 'type') - self.assertIsNotNone(column_overlay) - - def testComparable(self): - """Tests the comparable property.""" - column_overlay = resources.ColumnOverlay('identifier', 'name', 'type') - - expected_comparable = 'identifier: identifier, name: name, type: type' - self.assertEqual(column_overlay.comparable, expected_comparable) - - -class TableOverlayTest(test_lib.BaseTestCase): - """Tests for the table overlay.""" - - def testInitialize(self): - """Tests the __init__ function.""" - table_overlay = resources.TableOverlay('name') - self.assertIsNotNone(table_overlay) - - def testAddColumnOverlay(self): - """Tests the AddColumnOverlay function.""" - table_overlay = resources.TableOverlay('name') - column_overlay = resources.ColumnOverlay('identifier', 'name', 'type') - - table_overlay.AddColumnOverlay(column_overlay) - - with self.assertRaises(KeyError): - table_overlay.AddColumnOverlay(column_overlay) - - if __name__ == '__main__': unittest.main() diff --git a/tests/schema_extractor.py b/tests/schema_extractor.py new file mode 100644 index 0000000..3d8d0cb --- /dev/null +++ b/tests/schema_extractor.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the ESE database schema extractor.""" + +import unittest + +from esedbrc import schema_extractor + +from tests import test_lib + + +class EseDbSchemaExtractorTest(test_lib.BaseTestCase): + """Tests for the ESE database schema extractor.""" + + # pylint: disable=protected-access + + def testInitialize(self): + """Tests the __init__ function.""" + # TODO: pass artifact definitions path. + test_extractor = schema_extractor.EseDbSchemaExtractor(None) + self.assertIsNotNone(test_extractor) + + # TODO: add tests for _CheckSignature + # TODO: add tests for _FormatSchemaAsYAML + # TODO: add tests for _GetDatabaseSchema + # TODO: add tests for _GetDatabaseIdentifier + # TODO: add tests for _GetDatabaseSchemaFromFileObject + # TODO: add tests for GetDisplayPath + # TODO: add tests for ExtractSchemas + # TODO: add tests for FormatSchema + + +if __name__ == '__main__': + unittest.main()