Skip to content
This repository has been archived by the owner on Jun 17, 2023. It is now read-only.

Commit

Permalink
Decode bytes from utf-8 before returning. Also, add support for more …
Browse files Browse the repository at this point in the history
…itypes and unit tests. (#143)

Co-authored-by: wes <[email protected]>
  • Loading branch information
mdavis332 and wesyoung authored Jul 2, 2020
1 parent 339d461 commit 53d3c41
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 17 deletions.
26 changes: 26 additions & 0 deletions csirtg_indicator/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import sys
import re

PYVERSION = 2
if sys.version_info > (3,):
Expand Down Expand Up @@ -50,3 +51,28 @@
]

FIELDS = FIELDS_CORE + FIELDS_GEO + FIELDS_META + FIELDS_IP + FIELDS_TIME

RE_IPV4 = re.compile(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(\d{1,3})$')
RE_IPV4_CIDR = re.compile(r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\/\d{1,2})$')

# http://stackoverflow.com/a/17871737
RE_IPV6 = re.compile(r'(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))')

# http://goo.gl/Cztyn2 -- probably needs more work
# http://stackoverflow.com/a/26987741/7205341
# ^((xn--)?(--)?[a-zA-Z0-9-_@]+(-[a-zA-Z0-9]+)*\.)+[a-zA-Z]{2,}(--p1ai)?$
#RE_FQDN = re.compile('^((?!-))(xn--)?[a-z0-9][a-z0-9-_\.]{0,61}[a-z0-9]{0,1}\.(xn--)?([a-z0-9\-]{1,61}|[a-z0-9-]{1,30}\.[a-z]{2,})$')
# http://stackoverflow.com/questions/14402407/maximum-length-of-a-domain-name-without-the-http-www-com-parts
RE_FQDN = re.compile(r'^((?!-))(xn--)?[a-z0-9][a-z0-9-_\.]{0,245}[a-z0-9]{0,1}\.(xn--)?([a-z0-9\-]{1,61}|[a-z0-9-]{1,30}\.[a-z]{2,})$')
RE_URI_SCHEMES = re.compile(r'^(https?|ftp)://')
RE_EMAIL = re.compile(r'^[_a-z0-9-\!\#\$\%\&\'\*\+\-\/\=\?\^\_\`\{\|\}\~]+(\.[_a-z0-9-\!\#\$\%\&\'\*\+\-\/\=\?\^\_\`\{\|\}\~]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$')
RE_ASN = re.compile(r'^(AS|as)[0-9]{1,6}$')

RE_HASH = {
'uuid': re.compile(r'^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$'),
'md5': re.compile(r'^[a-fA-F0-9]{32}$'),
'sha1': re.compile(r'^[a-fA-F0-9]{40}$'),
'sha256': re.compile(r'^[a-fA-F0-9]{64}$'),
'sha512': re.compile(r'^[a-fA-F0-9]{128}$'),
}

52 changes: 42 additions & 10 deletions csirtg_indicator/format/zstix.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from cybox.common import Hash
from cybox.objects.file_object import File
from cybox.objects.address_object import Address
from csirtg_indicator.constants import RE_IPV4, RE_IPV6, RE_FQDN, RE_EMAIL, RE_ASN, RE_HASH, RE_URI_SCHEMES
from csirtg_indicator import Indicator
import re


Expand Down Expand Up @@ -40,27 +42,50 @@ def _sha256(keypair):
h = Hash(shv, Hash.TYPE_SHA256)
f.add_hash(h)
return f

def _sha512(keypair):
shv = Hash()
shv.simple_hash_value = keypair.get('indicator')

f = File()
h = Hash(shv, Hash.TYPE_SHA512)
f.add_hash(h)
return f


def _address_email(address):
if RE_EMAIL.search(address):
return 1

def _address_ipv4(address):
if re.search(r'^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?).){3}', address):
if RE_IPV4.search(address):
return 1

def _address_ipv6(address):
if RE_IPV6.search(address):
return 1

def _address_fqdn(address):
if re.search(r'^[a-zA-Z0-9.-_]+.[a-z]{2,6}$', address):
if RE_FQDN.search(address):
return 1

def _address_url(address):
if re.search(r'^(ftp|https?)://', address):
if RE_URI_SCHEMES.search(address):
return 1

def _address(keypair):
address = keypair.get('indicator')
if _address_fqdn(address):

if _address_email(address):
return Address(address, 'e-mail')
elif _address_url(address):
return Address(address, 'url')
elif _address_fqdn(address):
return Address(address, 'fqdn')
elif _address_ipv4(address):
return Address(address, 'ipv4-addr')
elif _address_url(address):
return Address(address, 'url')
elif _address_ipv6(address):
return Address(address, 'ipv6-addr')

indicator = stix.indicator.Indicator(timestamp=arrow.get(d.get('reporttime')).datetime)
indicator.set_producer_identity(d.get('provider'))
Expand All @@ -70,13 +95,18 @@ def _address(keypair):
indicator.description = ','.join(d.get('tags'))

itype = d.get('itype')
i = d.get('indicator')

if itype == 'md5':
if itype == 'md5' or RE_HASH['md5'].search(i):
f = _md5(d)
elif itype == 'sha1':
elif itype == 'sha1' or RE_HASH['sha1'].search(i):
f = _sha1(d)
elif itype == 'sha256':
elif itype == 'sha256' or RE_HASH['sha256'].search(i):
f = _sha256(d)
elif itype == 'sha512' or RE_HASH['sha512'].search(i):
f = _sha512(d)
elif itype == 'asn' or RE_ASN.search(i):
f = Address(i, 'asn')
else:
f = _address(d)

Expand All @@ -89,7 +119,9 @@ def __repr__(self):
stix_package.stix_header = stix_header

for d in self.data:
if isinstance(d, Indicator):
d = d.__dict__()
i = self._create_indicator(d)
stix_package.add_indicator(i)

return str(stix_package.to_xml())
return str(stix_package.to_xml().decode('utf-8'))
103 changes: 96 additions & 7 deletions test/format/test_stix.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,132 @@
try:
# Python 2
from StringIO import StringIO
except ImportError:
# Python 3
from io import StringIO

STIX_ENABLED = True

try:
from stix.core import STIXHeader
from stix.core import STIXPackage
from csirtg_indicator.format.zstix import Stix
except ImportError:
STIX_ENABLED = False

data = [
# fqdn
{
'indicator': "example.com",
'provider': "me.com",
'tlp': "amber",
'confidence': "85",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['botnet', 'malware']
},
# https with path
{
'indicator': "example.com",
'indicator': "https://example.com/with-path.php",
'provider': "me.com",
'tlp': "amber",
'confidence': "7",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['phishing']
},
# http no path
{
'indicator': "http://naked-url.tld",
'provider': "me.com",
'tlp': "amber",
'confidence': "85",
'confidence': "8.5",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['botnet', 'malware']
},
# ipv4
{
'indicator': "example.com",
'indicator': "1.1.1.1",
'provider': "me.com",
'tlp': "amber",
'confidence': "8",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['exploit', 'malware']
},
# email
{
'indicator': "[email protected]",
'provider': "me.com",
'tlp': "amber",
'confidence': "7.5",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['spam']
},
# ipv6
{
'indicator': "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
'provider': "me.com",
'tlp': "amber",
'confidence': "85",
'confidence': "8",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['botnet', 'malware']
},
# asn
{
'indicator': "23456",
'itype': "asn",
'provider': "me.com",
'tlp': "amber",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['scanner']
},
# sha512
{
'indicator': "ee26b0dd4af7e749aa1a8ee3c10ae9923f618980772e473f8819a5d4940e0db27ac185f8a0e1d5f84f88bc887fd67b143732c304cc5fa9ad8e6f57f50028a8ff",
'provider': "me.com",
'tlp': "amber",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['malware']
},
# md5
{
'indicator': "098f6bcd4621d373cade4e832627b4f6",
'provider': "me.com",
'tlp': "amber",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['malware']
},
# sha1
{
'indicator': "a94a8fe5ccb19ba61c4c0873d391e987982fbbd3",
'provider': "me.com",
'tlp': "amber",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['malware']
},
# sha256
{
'indicator': "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08",
'provider': "me.com",
'tlp': "amber",
'confidence': "10",
'reporttime': '2015-01-01T00:00:00Z',
'tags': ['malware']
}

]


def test_stix():
if STIX_ENABLED:
d = Stix(data)
print(d)
assert len(str(d)) > 2
stix_pkg = STIXPackage.from_xml(StringIO(str(d)))

assert len(stix_pkg.indicators.indicator) == len(data)
else:
print('STIX package not installed, skipping test')

if __name__ == '__main__':
test_stix()

0 comments on commit 53d3c41

Please sign in to comment.