From 7bb818b985cc971930f93b0581b963334f7a2b68 Mon Sep 17 00:00:00 2001 From: Dean Pemberton Date: Sat, 26 Oct 2019 01:48:20 +1300 Subject: [PATCH] Adding the ability to tell the Indicator class not to lowercase indicators. (#136) * Adding the ability to tell the Indicator class not to lowercase indicators. * Ensure that the default bahaviour matches the existing class behaviour (ie to lowercase indicators) --- csirtg_indicator/indicator.py | 59 +++++++++++++++++++++++------------ test/test_indicator.py | 30 ++++++++++++++++-- 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/csirtg_indicator/indicator.py b/csirtg_indicator/indicator.py index 323b78d..8218ef2 100644 --- a/csirtg_indicator/indicator.py +++ b/csirtg_indicator/indicator.py @@ -1,4 +1,18 @@ # -*- coding: utf-8 -*- +from pprint import pprint +import uuid +import logging +from .constants import PYVERSION, IPV4_PRIVATE_NETS, PROTOCOL_VERSION, FIELDS, FIELDS_TIME, LOG_FORMAT +from base64 import b64encode +from .exceptions import InvalidIndicator +from . import VERSION +from .utils import parse_timestamp, resolve_itype, is_subdomain, ipv4_normalize +import pytricia +import codecs +from datetime import datetime +from argparse import ArgumentParser, RawDescriptionHelpFormatter +import textwrap +import json import sys if sys.version_info > (3,): from urllib.parse import urlparse @@ -6,21 +20,6 @@ else: from urlparse import urlparse -import json -import textwrap -from argparse import ArgumentParser, RawDescriptionHelpFormatter -from datetime import datetime -import codecs -import pytricia -from .utils import parse_timestamp, resolve_itype, is_subdomain, ipv4_normalize -from . import VERSION -from .exceptions import InvalidIndicator -from base64 import b64encode -from .constants import PYVERSION, IPV4_PRIVATE_NETS, PROTOCOL_VERSION, FIELDS, FIELDS_TIME, LOG_FORMAT -import logging -import uuid - -from pprint import pprint IPV4_PRIVATE = pytricia.PyTricia() @@ -32,6 +31,8 @@ class Indicator(object): def __init__(self, indicator=None, version=PROTOCOL_VERSION, **kwargs): self.version = version + self._lowercase = True + self._lowercase = kwargs.get('lowercase', True) for k in FIELDS: if k in ['indicator', 'confidence', 'count']: # handle this at the end @@ -48,7 +49,8 @@ def __init__(self, indicator=None, version=PROTOCOL_VERSION, **kwargs): continue if isinstance(kwargs[k], basestring): - kwargs[k] = kwargs[k].lower() + if self._lowercase is True: + kwargs[k] = kwargs[k].lower() if k in ['tags', 'peers']: kwargs[k] = kwargs[k].split(',') @@ -87,15 +89,20 @@ def indicator(self, i): try: i = codecs.unicode_escape_encode(i.decode('utf-8'))[0] except Exception: - i = codecs.unicode_escape_encode(i.encode('utf-8', 'ignore').decode('utf-8'))[0] + i = codecs.unicode_escape_encode( + i.encode('utf-8', 'ignore').decode('utf-8'))[0] - i = i.lower() + if self._lowercase is True: + i = i.lower() self.itype = resolve_itype(i) self._indicator = i if self.itype == 'url': u = urlparse(self._indicator) - self._indicator = u.geturl().rstrip('/').lower() + if self._lowercase is True: + self._indicator = u.geturl().rstrip('/').lower() + else: + self._indicator = u.geturl().rstrip('/') if self.itype == 'ipv4': self._indicator = ipv4_normalize(self._indicator) @@ -165,6 +172,18 @@ def confidence(self, v): def confidence(self): return self._confidence + @property + def lowercase(self): + return self._lowercase + + @lowercase.setter + def lowercase(self, v): + self._lowercase = float(v) + + @lowercase.getter + def lowercase(self): + return self._lowercase + @property def count(self): return self._count @@ -244,7 +263,7 @@ def __repr__(self): v = v.strftime("%Y-%m-%dT%H:%M:%S.%fZ") if isinstance(v, basestring): - if k is not 'message' and not k.endswith('time'): + if k is not 'message' and not k.endswith('time') and self._lowercase is False: v = v.lower() if k == 'confidence': diff --git a/test/test_indicator.py b/test/test_indicator.py index 314d4bc..52f738b 100644 --- a/test/test_indicator.py +++ b/test/test_indicator.py @@ -3,6 +3,7 @@ from csirtg_indicator.exceptions import InvalidIndicator from random import randint, uniform + def test_indicator_ipv4(): i = Indicator('192.168.1.1') assert i.is_private() @@ -29,6 +30,30 @@ def test_indicator_url(): assert 'malware' in i.tags +def test_indicator_mixedcase_lower_false(): + i = Indicator('http://example.org/MiXeDCaSe', + tags='botnet,malware', lowercase=False) + + assert i.is_private() is False + assert i.indicator == 'http://example.org/MiXeDCaSe' + assert i.itype is not 'fqdn' + assert i.itype is 'url' + assert 'botnet' in i.tags + assert 'malware' in i.tags + + +def test_indicator_mixedcase_lower_true(): + i = Indicator('http://example.org/MiXeDCaSe', + tags='botnet,malware', lowercase=True) + + assert i.is_private() is False + assert i.indicator == 'http://example.org/mixedcase' + assert i.itype is not 'fqdn' + assert i.itype is 'url' + assert 'botnet' in i.tags + assert 'malware' in i.tags + + def test_indicator_str(): i = Indicator('http://example.org', tags='botnet,malware') @@ -68,7 +93,8 @@ def test_format_indicator(): def test_indicator_dest(): - i = Indicator(indicator='192.168.1.1', dest='10.0.0.1', portlist="23", protocol="tcp", dest_portlist='21,22-23') + i = Indicator(indicator='192.168.1.1', dest='10.0.0.1', + portlist="23", protocol="tcp", dest_portlist='21,22-23') assert i.dest assert i.dest_portlist @@ -109,4 +135,4 @@ def test_eq(): u2 = Indicator(indicator='192.168.1.1') u2.uuid = u1.uuid - assert u1 == u2 \ No newline at end of file + assert u1 == u2