Skip to content

Commit

Permalink
Changes for escaping Unicode surrogates in display name (log2timeline…
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz authored Jan 21, 2024
1 parent 4ca7060 commit 8ba364c
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 76 deletions.
14 changes: 2 additions & 12 deletions plaso/cli/status_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""The status view."""

import ctypes
import re
import sys
import time

Expand Down Expand Up @@ -37,8 +36,6 @@ class StatusView(object):
dfvfs_definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE: (
'storage media image')}

_UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]')

_UNITS_1024 = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'EiB', 'ZiB', 'YiB']

_WINAPI_STD_OUTPUT_HANDLE = -11
Expand Down Expand Up @@ -256,15 +253,8 @@ def _GetPathSpecificationString(self, path_spec):
Returns:
str: printable string representation of the path specification.
"""
path_spec_string = path_spec.comparable

if self._UNICODE_SURROGATES_RE.search(path_spec_string):
path_spec_string = path_spec_string.encode(
'utf-8', errors='surrogateescape')
path_spec_string = path_spec_string.decode(
'utf-8', errors='backslashreplace')

return path_spec_string
return path_spec.comparable.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

def _PrintAnalysisStatusUpdateFile(self, processing_status):
"""Prints an analysis status update in file mode.
Expand Down
15 changes: 3 additions & 12 deletions plaso/cli/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import codecs
import datetime
import locale
import re
import sys
import time
import textwrap
Expand All @@ -21,6 +20,7 @@

from plaso.cli import logger
from plaso.cli import views
from plaso.lib import definitions
from plaso.lib import errors


Expand All @@ -42,8 +42,6 @@ class CLITool(object):
# The fall back preferred encoding.
_PREFERRED_ENCODING = 'utf-8'

_UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]')

def __init__(self, input_reader=None, output_writer=None):
"""Initializes a command line interface tool.
Expand Down Expand Up @@ -154,15 +152,8 @@ def _GetPathSpecificationString(self, path_spec):
if not path_spec:
return 'N/A'

path_spec_string = path_spec.comparable

if self._UNICODE_SURROGATES_RE.search(path_spec_string):
path_spec_string = path_spec_string.encode(
'utf-8', errors='surrogateescape')
path_spec_string = path_spec_string.decode(
'utf-8', errors='backslashreplace')

return path_spec_string
return path_spec.comparable.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

def _ParseInformationalOptions(self, options):
"""Parses the informational options.
Expand Down
15 changes: 3 additions & 12 deletions plaso/engine/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"""Extractor classes, used to extract information from sources."""

import copy
import re

import pysigscan

Expand All @@ -12,6 +11,7 @@
from dfvfs.resolver import resolver as path_spec_resolver

from plaso.engine import logger
from plaso.lib import definitions
from plaso.lib import errors
from plaso.parsers import interface as parsers_interface
from plaso.parsers import manager as parsers_manager
Expand Down Expand Up @@ -349,8 +349,6 @@ class PathSpecExtractor(object):

_MAXIMUM_DEPTH = 255

_UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]')

def _ExtractPathSpecsFromDirectory(self, file_entry, depth=0):
"""Extracts path specification from a directory.
Expand Down Expand Up @@ -492,15 +490,8 @@ def _GetPathSpecificationString(self, path_spec):
Returns:
str: printable string representation of the path specification.
"""
path_spec_string = path_spec.comparable

if self._UNICODE_SURROGATES_RE.search(path_spec_string):
path_spec_string = path_spec_string.encode(
'utf-8', errors='surrogateescape')
path_spec_string = path_spec_string.decode(
'utf-8', errors='backslashreplace')

return path_spec_string
return path_spec.comparable.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

def ExtractPathSpecs(
self, path_spec, find_specs=None, recurse_file_system=True,
Expand Down
45 changes: 18 additions & 27 deletions plaso/engine/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@
"""The path helper."""

import os
import re

from dfvfs.lib import definitions as dfvfs_definitions

from plaso.engine import logger
from plaso.lib import definitions


class PathHelper(object):
"""Class that implements the path helper."""

_NON_PRINTABLE_CHARACTERS = list(range(0, 0x20)) + list(range(0x7f, 0xa0))
_ESCAPE_CHARACTERS = str.maketrans({
value: '\\x{0:02x}'.format(value)
for value in _NON_PRINTABLE_CHARACTERS})

_RECURSIVE_GLOB_LIMIT = 10

_PATH_EXPANSIONS_PER_USERS_VARIABLE = {
Expand All @@ -31,8 +26,6 @@ class PathHelper(object):
'%%users.temp%%': [
['%%users.localappdata%%', 'Temp']]}

_UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]')

@classmethod
def _ExpandUsersHomeDirectoryPathSegments(
cls, path_segments, path_separator, user_accounts):
Expand Down Expand Up @@ -180,20 +173,20 @@ def ExpandGlobStars(cls, path, path_separator):
recursion_depth = int(path_segment[2:], 10)
except (TypeError, ValueError):
logger.warning((
'Globstar with suffix "{0:s}" in path "{1:s}" not '
'supported.').format(path_segment, path))
f'Globstar with suffix "{path_segment:s}" in path "{path:s}" '
f'not supported.'))

elif '**' in path_segment:
logger.warning((
'Globstar with prefix "{0:s}" in path "{1:s}" not '
'supported.').format(path_segment, path))
f'Globstar with prefix "{path_segment:s}" in path "{path:s}" not '
f'supported.'))

if recursion_depth is not None:
if recursion_depth <= 1 or recursion_depth > cls._RECURSIVE_GLOB_LIMIT:
logger.warning((
'Globstar "{0:s}" in path "{1:s}" exceed recursion maximum '
'recursion depth, limiting to: {2:d}.').format(
path_segment, path, cls._RECURSIVE_GLOB_LIMIT))
f'Globstar "{path_segment:s}" in path "{path:s}" exceed '
f'recursion maximum recursion depth, limiting to: '
f'{cls._RECURSIVE_GLOB_LIMIT:d}.'))
recursion_depth = cls._RECURSIVE_GLOB_LIMIT

next_segment_index = segment_index + 1
Expand Down Expand Up @@ -323,18 +316,16 @@ def GetDisplayNameForPathSpec(cls, path_spec):
elif path_spec.type_indicator == dfvfs_definitions.TYPE_INDICATOR_GZIP:
parent_path_spec = parent_path_spec.parent

display_name = ':'.join([path_type_indicator, relative_path])
display_name = display_name.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

if parent_path_spec and parent_path_spec.type_indicator == (
dfvfs_definitions.TYPE_INDICATOR_VSHADOW):
store_index = getattr(path_spec.parent, 'store_index', None)
if store_index is not None:
return 'VSS{0:d}:{1:s}:{2:s}'.format(
store_index + 1, path_spec.type_indicator, relative_path)

display_name = '{0:s}:{1:s}'.format(path_type_indicator, relative_path)

if cls._UNICODE_SURROGATES_RE.search(display_name):
display_name = display_name.encode('utf-8', errors='surrogateescape')
display_name = display_name.decode('utf-8', errors='backslashreplace')
store_index += 1
return f'VSS{store_index:d}:{display_name:s}'

return display_name

Expand Down Expand Up @@ -364,12 +355,12 @@ def GetRelativePathForPathSpec(cls, path_spec):
if not location:
return None

location = location.translate(cls._ESCAPE_CHARACTERS)

data_stream = getattr(path_spec, 'data_stream', None)
if data_stream:
data_stream = data_stream.translate(cls._ESCAPE_CHARACTERS)
location = '{0:s}:{1:s}'.format(location, data_stream)
location = ':'.join([location, data_stream])

location = location.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

if path_spec.type_indicator != dfvfs_definitions.TYPE_INDICATOR_OS:
return location
Expand Down
14 changes: 2 additions & 12 deletions plaso/multi_process/extraction_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import logging
import multiprocessing
import os
import re
import time
import traceback

Expand Down Expand Up @@ -106,8 +105,6 @@ class ExtractionMultiProcessEngine(task_engine.TaskMultiProcessEngine):

_TASK_QUEUE_TIMEOUT_SECONDS = 2

_UNICODE_SURROGATES_RE = re.compile('[\ud800-\udfff]')

_WORKER_PROCESSES_MINIMUM = 2
_WORKER_PROCESSES_MAXIMUM = 99

Expand Down Expand Up @@ -366,15 +363,8 @@ def _GetPathSpecificationString(self, path_spec):
Returns:
str: printable string representation of the path specification.
"""
path_spec_string = path_spec.comparable

if self._UNICODE_SURROGATES_RE.search(path_spec_string):
path_spec_string = path_spec_string.encode(
'utf-8', errors='surrogateescape')
path_spec_string = path_spec_string.decode(
'utf-8', errors='backslashreplace')

return path_spec_string
return path_spec.comparable.translate(
definitions.NON_PRINTABLE_CHARACTER_TRANSLATION_TABLE)

def _MergeAttributeContainer(self, storage_writer, merge_helper, container):
"""Merges an attribute container from a task store into the storage writer.
Expand Down
3 changes: 2 additions & 1 deletion tests/engine/path_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,8 @@ def testGetDisplayNameForPathSpec(self):
parent=os_path_spec)

expected_display_name = (
'TAR:/ustar/umlauts-\\xc4\\xd6\\xdc\\xe4\\xf6\\xfc\\xdf')
'TAR:/ustar/umlauts-\\U0000dcc4\\U0000dcd6\\U0000dcdc\\U0000dce4'
'\\U0000dcf6\\U0000dcfc\\U0000dcdf')
display_name = path_helper.PathHelper.GetDisplayNameForPathSpec(
tar_path_spec)
self.assertEqual(display_name, expected_display_name)
Expand Down

0 comments on commit 8ba364c

Please sign in to comment.