diff --git a/plaso/cli/status_view.py b/plaso/cli/status_view.py index f396ee8e58..6357efbe1d 100644 --- a/plaso/cli/status_view.py +++ b/plaso/cli/status_view.py @@ -2,7 +2,6 @@ """The status view.""" import ctypes -import re import sys import time @@ -37,8 +36,6 @@ class StatusView(object): dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE: ( 'storage media image')} - _UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]') - _UNITS_1024 = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'EiB', 'ZiB', 'YiB'] _WINAPI_STD_OUTPUT_HANDLE = -11 @@ -256,15 +253,8 @@ def _GetPathSpecificationString(self, path_spec): Returns: str: printable string representation of the path specification. """ - path_spec_string = path_spec.comparable - - if self._UNICODE_SURROGATES_RE.search(path_spec_string): - path_spec_string = path_spec_string.encode( - 'utf-8', errors='surrogateescape') - path_spec_string = path_spec_string.decode( - 'utf-8', errors='backslashreplace') - - return path_spec_string + return path_spec.comparable.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) def _PrintAnalysisStatusUpdateFile(self, processing_status): """Prints an analysis status update in file mode. diff --git a/plaso/cli/tools.py b/plaso/cli/tools.py index 8c8ff0ba81..ec1b19c3d2 100644 --- a/plaso/cli/tools.py +++ b/plaso/cli/tools.py @@ -5,7 +5,6 @@ import codecs import datetime import locale -import re import sys import time import textwrap @@ -21,6 +20,7 @@ from plaso.cli import logger from plaso.cli import views +from plaso.lib import definitions from plaso.lib import errors @@ -42,8 +42,6 @@ class CLITool(object): # The fall back preferred encoding. _PREFERRED_ENCODING = 'utf-8' - _UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]') - def __init__(self, input_reader=None, output_writer=None): """Initializes a command line interface tool. @@ -154,15 +152,8 @@ def _GetPathSpecificationString(self, path_spec): if not path_spec: return 'N/A' - path_spec_string = path_spec.comparable - - if self._UNICODE_SURROGATES_RE.search(path_spec_string): - path_spec_string = path_spec_string.encode( - 'utf-8', errors='surrogateescape') - path_spec_string = path_spec_string.decode( - 'utf-8', errors='backslashreplace') - - return path_spec_string + return path_spec.comparable.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) def _ParseInformationalOptions(self, options): """Parses the informational options. diff --git a/plaso/engine/extractors.py b/plaso/engine/extractors.py index d977fdf0e5..5509b68128 100644 --- a/plaso/engine/extractors.py +++ b/plaso/engine/extractors.py @@ -2,7 +2,6 @@ """Extractor classes, used to extract information from sources.""" import copy -import re import pysigscan @@ -12,6 +11,7 @@ from dfvfs.resolver import resolver as path_spec_resolver from plaso.engine import logger +from plaso.lib import definitions from plaso.lib import errors from plaso.parsers import interface as parsers_interface from plaso.parsers import manager as parsers_manager @@ -349,8 +349,6 @@ class PathSpecExtractor(object): _MAXIMUM_DEPTH = 255 - _UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]') - def _ExtractPathSpecsFromDirectory(self, file_entry, depth=0): """Extracts path specification from a directory. @@ -492,15 +490,8 @@ def _GetPathSpecificationString(self, path_spec): Returns: str: printable string representation of the path specification. """ - path_spec_string = path_spec.comparable - - if self._UNICODE_SURROGATES_RE.search(path_spec_string): - path_spec_string = path_spec_string.encode( - 'utf-8', errors='surrogateescape') - path_spec_string = path_spec_string.decode( - 'utf-8', errors='backslashreplace') - - return path_spec_string + return path_spec.comparable.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) def ExtractPathSpecs( self, path_spec, find_specs=None, recurse_file_system=True, diff --git a/plaso/engine/path_helper.py b/plaso/engine/path_helper.py index 0f40e281d9..34e5a4d4e7 100644 --- a/plaso/engine/path_helper.py +++ b/plaso/engine/path_helper.py @@ -2,21 +2,16 @@ """The path helper.""" import os -import re from dfvfs.lib import definitions as dfvfs_definitions from plaso.engine import logger +from plaso.lib import definitions class PathHelper(object): """Class that implements the path helper.""" - _NON_PRINTABLE_CHARACTERS = list(range(0, 0x20)) + list(range(0x7f, 0xa0)) - _ESCAPE_CHARACTERS = str.maketrans({ - value: '\\x{0:02x}'.format(value) - for value in _NON_PRINTABLE_CHARACTERS}) - _RECURSIVE_GLOB_LIMIT = 10 _PATH_EXPANSIONS_PER_USERS_VARIABLE = { @@ -31,8 +26,6 @@ class PathHelper(object): '%%users.temp%%': [ ['%%users.localappdata%%', 'Temp']]} - _UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]') - @classmethod def _ExpandUsersHomeDirectoryPathSegments( cls, path_segments, path_separator, user_accounts): @@ -180,20 +173,20 @@ def ExpandGlobStars(cls, path, path_separator): recursion_depth = int(path_segment[2:], 10) except (TypeError, ValueError): logger.warning(( - 'Globstar with suffix "{0:s}" in path "{1:s}" not ' - 'supported.').format(path_segment, path)) + f'Globstar with suffix "{path_segment:s}" in path "{path:s}" ' + f'not supported.')) elif '**' in path_segment: logger.warning(( - 'Globstar with prefix "{0:s}" in path "{1:s}" not ' - 'supported.').format(path_segment, path)) + f'Globstar with prefix "{path_segment:s}" in path "{path:s}" not ' + f'supported.')) if recursion_depth is not None: if recursion_depth <= 1 or recursion_depth > cls._RECURSIVE_GLOB_LIMIT: logger.warning(( - 'Globstar "{0:s}" in path "{1:s}" exceed recursion maximum ' - 'recursion depth, limiting to: {2:d}.').format( - path_segment, path, cls._RECURSIVE_GLOB_LIMIT)) + f'Globstar "{path_segment:s}" in path "{path:s}" exceed ' + f'recursion maximum recursion depth, limiting to: ' + f'{cls._RECURSIVE_GLOB_LIMIT:d}.')) recursion_depth = cls._RECURSIVE_GLOB_LIMIT next_segment_index = segment_index + 1 @@ -323,18 +316,16 @@ def GetDisplayNameForPathSpec(cls, path_spec): elif path_spec.type_indicator == dfvfs_definitions.TYPE_INDICATOR_GZIP: parent_path_spec = parent_path_spec.parent + display_name = ':'.join([path_type_indicator, relative_path]) + display_name = display_name.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) + if parent_path_spec and parent_path_spec.type_indicator == ( dfvfs_definitions.TYPE_INDICATOR_VSHADOW): store_index = getattr(path_spec.parent, 'store_index', None) if store_index is not None: - return 'VSS{0:d}:{1:s}:{2:s}'.format( - store_index + 1, path_spec.type_indicator, relative_path) - - display_name = '{0:s}:{1:s}'.format(path_type_indicator, relative_path) - - if cls._UNICODE_SURROGATES_RE.search(display_name): - display_name = display_name.encode('utf-8', errors='surrogateescape') - display_name = display_name.decode('utf-8', errors='backslashreplace') + store_index += 1 + return f'VSS{store_index:d}:{display_name:s}' return display_name @@ -364,12 +355,12 @@ def GetRelativePathForPathSpec(cls, path_spec): if not location: return None - location = location.translate(cls._ESCAPE_CHARACTERS) - data_stream = getattr(path_spec, 'data_stream', None) if data_stream: - data_stream = data_stream.translate(cls._ESCAPE_CHARACTERS) - location = '{0:s}:{1:s}'.format(location, data_stream) + location = ':'.join([location, data_stream]) + + location = location.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) if path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_OS: return location diff --git a/plaso/multi_process/extraction_engine.py b/plaso/multi_process/extraction_engine.py index f847511b1f..7cdaa10fce 100644 --- a/plaso/multi_process/extraction_engine.py +++ b/plaso/multi_process/extraction_engine.py @@ -6,7 +6,6 @@ import logging import multiprocessing import os -import re import time import traceback @@ -106,8 +105,6 @@ class ExtractionMultiProcessEngine(task_engine.TaskMultiProcessEngine): _TASK_QUEUE_TIMEOUT_SECONDS = 2 - _UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]') - _WORKER_PROCESSES_MINIMUM = 2 _WORKER_PROCESSES_MAXIMUM = 99 @@ -366,15 +363,8 @@ def _GetPathSpecificationString(self, path_spec): Returns: str: printable string representation of the path specification. """ - path_spec_string = path_spec.comparable - - if self._UNICODE_SURROGATES_RE.search(path_spec_string): - path_spec_string = path_spec_string.encode( - 'utf-8', errors='surrogateescape') - path_spec_string = path_spec_string.decode( - 'utf-8', errors='backslashreplace') - - return path_spec_string + return path_spec.comparable.translate( + definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE) def _MergeAttributeContainer(self, storage_writer, merge_helper, container): """Merges an attribute container from a task store into the storage writer. diff --git a/tests/engine/path_helper.py b/tests/engine/path_helper.py index 9b9f6058a5..8170f60597 100644 --- a/tests/engine/path_helper.py +++ b/tests/engine/path_helper.py @@ -370,7 +370,8 @@ def testGetDisplayNameForPathSpec(self): parent=os_path_spec) expected_display_name = ( - 'TAR:/ustar/umlauts-\\xc4\\xd6\\xdc\\xe4\\xf6\\xfc\\xdf') + 'TAR:/ustar/umlauts-\\U0000dcc4\\U0000dcd6\\U0000dcdc\\U0000dce4' + '\\U0000dcf6\\U0000dcfc\\U0000dcdf') display_name = path_helper.PathHelper.GetDisplayNameForPathSpec( tar_path_spec) self.assertEqual(display_name, expected_display_name)