Skip to content
This repository has been archived by the owner on Jun 17, 2023. It is now read-only.

Commit

Permalink
Adding the ability to tell the Indicator class not to lowercase indic…
Browse files Browse the repository at this point in the history
…ators. (#136)

* Adding the ability to tell the Indicator class not to lowercase indicators.

* Ensure that the default bahaviour matches the existing class behaviour (ie to lowercase indicators)
  • Loading branch information
deanpemberton authored and wesyoung committed Oct 25, 2019
1 parent 8033200 commit 7bb818b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 22 deletions.
59 changes: 39 additions & 20 deletions csirtg_indicator/indicator.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
# -*- coding: utf-8 -*-
from pprint import pprint
import uuid
import logging
from .constants import PYVERSION, IPV4_PRIVATE_NETS, PROTOCOL_VERSION, FIELDS, FIELDS_TIME, LOG_FORMAT
from base64 import b64encode
from .exceptions import InvalidIndicator
from . import VERSION
from .utils import parse_timestamp, resolve_itype, is_subdomain, ipv4_normalize
import pytricia
import codecs
from datetime import datetime
from argparse import ArgumentParser, RawDescriptionHelpFormatter
import textwrap
import json
import sys
if sys.version_info > (3,):
from urllib.parse import urlparse
basestring = (str, bytes)
else:
from urlparse import urlparse

import json
import textwrap
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from datetime import datetime
import codecs
import pytricia
from .utils import parse_timestamp, resolve_itype, is_subdomain, ipv4_normalize
from . import VERSION
from .exceptions import InvalidIndicator
from base64 import b64encode
from .constants import PYVERSION, IPV4_PRIVATE_NETS, PROTOCOL_VERSION, FIELDS, FIELDS_TIME, LOG_FORMAT
import logging
import uuid

from pprint import pprint

IPV4_PRIVATE = pytricia.PyTricia()

Expand All @@ -32,6 +31,8 @@ class Indicator(object):

def __init__(self, indicator=None, version=PROTOCOL_VERSION, **kwargs):
self.version = version
self._lowercase = True
self._lowercase = kwargs.get('lowercase', True)

for k in FIELDS:
if k in ['indicator', 'confidence', 'count']: # handle this at the end
Expand All @@ -48,7 +49,8 @@ def __init__(self, indicator=None, version=PROTOCOL_VERSION, **kwargs):
continue

if isinstance(kwargs[k], basestring):
kwargs[k] = kwargs[k].lower()
if self._lowercase is True:
kwargs[k] = kwargs[k].lower()
if k in ['tags', 'peers']:
kwargs[k] = kwargs[k].split(',')

Expand Down Expand Up @@ -87,15 +89,20 @@ def indicator(self, i):
try:
i = codecs.unicode_escape_encode(i.decode('utf-8'))[0]
except Exception:
i = codecs.unicode_escape_encode(i.encode('utf-8', 'ignore').decode('utf-8'))[0]
i = codecs.unicode_escape_encode(
i.encode('utf-8', 'ignore').decode('utf-8'))[0]

i = i.lower()
if self._lowercase is True:
i = i.lower()
self.itype = resolve_itype(i)
self._indicator = i

if self.itype == 'url':
u = urlparse(self._indicator)
self._indicator = u.geturl().rstrip('/').lower()
if self._lowercase is True:
self._indicator = u.geturl().rstrip('/').lower()
else:
self._indicator = u.geturl().rstrip('/')

if self.itype == 'ipv4':
self._indicator = ipv4_normalize(self._indicator)
Expand Down Expand Up @@ -165,6 +172,18 @@ def confidence(self, v):
def confidence(self):
return self._confidence

@property
def lowercase(self):
return self._lowercase

@lowercase.setter
def lowercase(self, v):
self._lowercase = float(v)

@lowercase.getter
def lowercase(self):
return self._lowercase

@property
def count(self):
return self._count
Expand Down Expand Up @@ -244,7 +263,7 @@ def __repr__(self):
v = v.strftime("%Y-%m-%dT%H:%M:%S.%fZ")

if isinstance(v, basestring):
if k is not 'message' and not k.endswith('time'):
if k is not 'message' and not k.endswith('time') and self._lowercase is False:
v = v.lower()

if k == 'confidence':
Expand Down
30 changes: 28 additions & 2 deletions test/test_indicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from csirtg_indicator.exceptions import InvalidIndicator
from random import randint, uniform


def test_indicator_ipv4():
i = Indicator('192.168.1.1')
assert i.is_private()
Expand All @@ -29,6 +30,30 @@ def test_indicator_url():
assert 'malware' in i.tags


def test_indicator_mixedcase_lower_false():
i = Indicator('http://example.org/MiXeDCaSe',
tags='botnet,malware', lowercase=False)

assert i.is_private() is False
assert i.indicator == 'http://example.org/MiXeDCaSe'
assert i.itype is not 'fqdn'
assert i.itype is 'url'
assert 'botnet' in i.tags
assert 'malware' in i.tags


def test_indicator_mixedcase_lower_true():
i = Indicator('http://example.org/MiXeDCaSe',
tags='botnet,malware', lowercase=True)

assert i.is_private() is False
assert i.indicator == 'http://example.org/mixedcase'
assert i.itype is not 'fqdn'
assert i.itype is 'url'
assert 'botnet' in i.tags
assert 'malware' in i.tags


def test_indicator_str():
i = Indicator('http://example.org', tags='botnet,malware')

Expand Down Expand Up @@ -68,7 +93,8 @@ def test_format_indicator():


def test_indicator_dest():
i = Indicator(indicator='192.168.1.1', dest='10.0.0.1', portlist="23", protocol="tcp", dest_portlist='21,22-23')
i = Indicator(indicator='192.168.1.1', dest='10.0.0.1',
portlist="23", protocol="tcp", dest_portlist='21,22-23')
assert i.dest
assert i.dest_portlist

Expand Down Expand Up @@ -109,4 +135,4 @@ def test_eq():
u2 = Indicator(indicator='192.168.1.1')

u2.uuid = u1.uuid
assert u1 == u2
assert u1 == u2

0 comments on commit 7bb818b

Please sign in to comment.