forked from oasis-open/cti-python-stix2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
581 lines (472 loc) · 19.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
"""Utility functions and classes for the STIX2 library."""
import collections.abc
import datetime as dt
import enum
import json
import re
import pytz
import stix2.registry as mappings
import stix2.version
# Sentinel value for properties that should be set to the current time.
# We can't use the standard 'default' approach, since if there are multiple
# timestamps in a single object, the timestamps will vary by a few microseconds.
NOW = object()
PREFIX_21_REGEX = re.compile(r'^[a-z].*')
_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
_TIMESTAMP_FORMAT_FRAC = "%Y-%m-%dT%H:%M:%S.%fZ"
class Precision(enum.Enum):
"""
Timestamp format precisions.
"""
# auto() wasn't introduced until Python 3.6.
ANY = 1
SECOND = 2
MILLISECOND = 3
class PrecisionConstraint(enum.Enum):
"""
Timestamp precision constraints. These affect how the Precision
values are applied when formatting a timestamp.
These constraints don't really make sense with the ANY precision, so they
have no effect in that case.
"""
EXACT = 1 # format must have exactly the given precision
MIN = 2 # format must have at least the given precision
# no need for a MAX constraint yet
def to_enum(value, enum_type, enum_default=None):
"""
Detect and convert strings to enums and None to a default enum. This
allows use of strings and None in APIs, while enforcing the enum type: if
you use a string, it must name a valid enum value. This implementation is
case-insensitive.
:param value: A value to be interpreted as an enum (string, Enum instance,
or None). If an Enum instance, it must be an instance of enum_type.
:param enum_type: The enum type which strings will be interpreted against
:param enum_default: The default enum to use if value is None. Must be
an instance of enum_type, or None. If None, you are disallowing a
default and requiring that value be non-None.
:return: An instance of enum_type
:raises TypeError: If value was neither an instance of enum_type, None, nor
a string
:raises KeyError: If value was a string which couldn't be interpreted as an
enum value from enum_type
"""
assert enum_default is None or isinstance(enum_default, enum_type)
if not isinstance(value, enum_type):
if value is None and enum_default is not None:
value = enum_default
elif isinstance(value, str):
value = enum_type[value.upper()]
else:
raise TypeError(
"Not a valid {}: {}".format(
enum_type.__name__, value,
),
)
return value
class STIXdatetime(dt.datetime):
"""
Bundle a datetime with some format-related metadata, so that JSON
serialization has the info it needs to produce compliant timestamps.
"""
def __new__(cls, *args, **kwargs):
precision = to_enum(
kwargs.pop("precision", Precision.ANY),
Precision,
)
precision_constraint = to_enum(
kwargs.pop("precision_constraint", PrecisionConstraint.EXACT),
PrecisionConstraint,
)
if isinstance(args[0], dt.datetime): # Allow passing in a datetime object
dttm = args[0]
args = (
dttm.year, dttm.month, dttm.day, dttm.hour, dttm.minute,
dttm.second, dttm.microsecond, dttm.tzinfo,
)
# self will be an instance of STIXdatetime, not dt.datetime
self = dt.datetime.__new__(cls, *args, **kwargs)
self.precision = precision
self.precision_constraint = precision_constraint
return self
def __repr__(self):
return "'%s'" % format_datetime(self)
def deduplicate(stix_obj_list):
"""Deduplicate a list of STIX objects to a unique set.
Reduces a set of STIX objects to unique set by looking
at 'id' and 'modified' fields - as a unique object version
is determined by the combination of those fields
Note: Be aware, as can be seen in the implementation
of deduplicate(),that if the "stix_obj_list" argument has
multiple STIX objects of the same version, the last object
version found in the list will be the one that is returned.
Args:
stix_obj_list (list): list of STIX objects (dicts)
Returns:
A list with a unique set of the passed list of STIX objects.
"""
unique_objs = {}
for obj in stix_obj_list:
ver = obj.get("modified") or obj.get("created")
if ver is None:
unique_objs[obj["id"]] = obj
else:
unique_objs[(obj['id'], ver)] = obj
return list(unique_objs.values())
def get_timestamp():
"""Return a STIX timestamp of the current date and time."""
return STIXdatetime.now(tz=pytz.UTC)
def format_datetime(dttm):
"""Convert a datetime object into a valid STIX timestamp string.
1. Convert to timezone-aware
2. Convert to UTC
3. Format in ISO format
4. Ensure correct precision
a. Add subsecond value if warranted, according to precision settings
5. Add "Z"
"""
if dttm.tzinfo is None or dttm.tzinfo.utcoffset(dttm) is None:
# dttm is timezone-naive; assume UTC
zoned = pytz.utc.localize(dttm)
else:
zoned = dttm.astimezone(pytz.utc)
ts = zoned.strftime('%Y-%m-%dT%H:%M:%S')
precision = getattr(dttm, 'precision', Precision.ANY)
precision_constraint = getattr(
dttm, 'precision_constraint', PrecisionConstraint.EXACT,
)
frac_seconds_str = ""
if precision == Precision.ANY:
# No need to truncate; ignore constraint
if zoned.microsecond:
frac_seconds_str = "{:06d}".format(zoned.microsecond).rstrip("0")
elif precision == Precision.SECOND:
if precision_constraint == PrecisionConstraint.MIN:
# second precision, or better. Winds up being the same as ANY:
# just use all our digits
if zoned.microsecond:
frac_seconds_str = "{:06d}".format(zoned.microsecond)\
.rstrip("0")
# exact: ignore microseconds entirely
else:
# precision == millisecond
if precision_constraint == PrecisionConstraint.EXACT:
# can't rstrip() here or we may lose precision
frac_seconds_str = "{:06d}".format(zoned.microsecond)[:3]
else:
# millisecond precision, or better. So we can rstrip() zeros, but
# only to a length of at least 3 digits (ljust() adds zeros back,
# if it stripped too far.)
frac_seconds_str = "{:06d}"\
.format(zoned.microsecond)\
.rstrip("0")\
.ljust(3, "0")
ts = "{}{}{}Z".format(
ts,
"." if frac_seconds_str else "",
frac_seconds_str,
)
return ts
def parse_into_datetime(
value, precision=Precision.ANY,
precision_constraint=PrecisionConstraint.EXACT,
):
"""
Parse a value into a valid STIX timestamp object. Also, optionally adjust
precision of fractional seconds. This allows alignment with JSON
serialization requirements, and helps ensure we're not using extra
precision which would be lost upon JSON serialization. The precision
info will be embedded in the returned object, so that JSON serialization
will format it correctly.
:param value: A datetime.datetime or datetime.date instance, or a string
:param precision: A precision value: either an instance of the Precision
enum, or a string naming one of the enum values (case-insensitive)
:param precision_constraint: A precision constraint value: either an
instance of the PrecisionConstraint enum, or a string naming one of
the enum values (case-insensitive)
:return: A STIXdatetime instance, which is a datetime but also carries the
precision info necessary to properly JSON-serialize it.
"""
precision = to_enum(precision, Precision)
precision_constraint = to_enum(precision_constraint, PrecisionConstraint)
if isinstance(value, dt.date):
if hasattr(value, 'hour'):
ts = value
else:
# Add a time component
ts = dt.datetime.combine(value, dt.time(0, 0, tzinfo=pytz.utc))
else:
# value isn't a date or datetime object so assume it's a string
fmt = _TIMESTAMP_FORMAT_FRAC if "." in value else _TIMESTAMP_FORMAT
try:
parsed = dt.datetime.strptime(value, fmt)
except (TypeError, ValueError):
# Unknown format
raise ValueError(
"must be a datetime object, date object, or "
"timestamp string in a recognizable format.",
)
if parsed.tzinfo:
ts = parsed.astimezone(pytz.utc)
else:
# Doesn't have timezone info in the string; assume UTC
ts = pytz.utc.localize(parsed)
# Ensure correct precision
if precision == Precision.SECOND:
if precision_constraint == PrecisionConstraint.EXACT:
ts = ts.replace(microsecond=0)
# else, no need to modify fractional seconds
elif precision == Precision.MILLISECOND:
if precision_constraint == PrecisionConstraint.EXACT:
us = (ts.microsecond // 1000) * 1000
ts = ts.replace(microsecond=us)
# else: at least millisecond precision: the constraint will affect JSON
# formatting, but there's nothing we need to do here.
# else, precision == Precision.ANY: nothing for us to do.
return STIXdatetime(
ts, precision=precision, precision_constraint=precision_constraint,
)
def _get_dict(data):
"""Return data as a dictionary.
Input can be a dictionary, string, or file-like object.
"""
if type(data) is dict:
return data
else:
try:
return json.loads(data)
except TypeError:
pass
try:
return json.load(data)
except AttributeError:
pass
try:
return dict(data)
except (ValueError, TypeError):
raise ValueError("Cannot convert '%s' to dictionary." % str(data))
def get_class_hierarchy_names(obj):
"""Given an object, return the names of the class hierarchy."""
names = []
for cls in obj.__class__.__mro__:
names.append(cls.__name__)
return names
def get_type_from_id(stix_id):
return stix_id.split('--', 1)[0]
def detect_spec_version(stix_dict):
"""
Given a dict representing a STIX object, try to detect what spec version
it is likely to comply with.
:param stix_dict: A dict with some STIX content. Must at least have a
"type" property.
:return: A STIX version in "X.Y" format
"""
obj_type = stix_dict["type"]
if 'spec_version' in stix_dict:
# For STIX 2.0, applies to bundles only. Presence in a bundle implies
# STIX 2.0; the value applies to the content of the bundle, not the
# bundle itself, so we don't care here about the value.
#
# For STIX 2.1+, applies to non-bundles only.
if obj_type == "bundle":
v = "2.0"
else:
v = stix_dict['spec_version']
elif "id" not in stix_dict:
# Only 2.0 SCOs don't have ID properties
v = "2.0"
elif obj_type == 'bundle':
# Bundle without a spec_version property: must be 2.1. But to
# future-proof, use max version over all contained SCOs, with 2.1
# minimum.
v = max(
"2.1",
max(
detect_spec_version(obj) for obj in stix_dict["objects"]
),
)
elif obj_type in mappings.STIX2_OBJ_MAPS["2.1"]["observables"]:
# Non-bundle object with an ID and without spec_version. Could be a
# 2.1 SCO or 2.0 SDO/SRO/marking. Check for 2.1 SCO...
v = "2.1"
else:
# Not a 2.1 SCO; must be a 2.0 object.
v = "2.0"
return v
def _stix_type_of(value):
"""
Get a STIX type from the given value: if a STIX ID is passed, the type
prefix is extracted; if string which is not a STIX ID is passed, it is
assumed to be a STIX type and is returned; otherwise it is assumed to be a
mapping with a "type" property, and the value of that property is returned.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:return: A STIX type
"""
if isinstance(value, str):
if "--" in value:
type_ = get_type_from_id(value)
else:
type_ = value
else:
type_ = value["type"]
return type_
def is_sdo(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SDO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SDO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SDO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["objects"] and type_ not in {
"relationship", "sighting", "marking-definition", "bundle",
"language-content",
}
return result
def is_sco(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SCO of the
given STIX version. If value is a type or ID, this just checks whether
the type was registered as an SCO in the given STIX version. If a mapping,
*simple* STIX version inference is additionally done on the value, and the
result is checked against stix_version. It does not attempt to fully
validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SCO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"]
return result
def is_sro(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an SRO of the
given STIX version. If value is a type or ID, this just checks whether
the type is "sighting" or "relationship". If a mapping, *simple* STIX
version inference is additionally done on the value, and the result is
checked against stix_version. It does not attempt to fully validate the
value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is an SRO type of the given
version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ in ("sighting", "relationship")
return result
def is_object(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether an object, type, or ID is/is for any STIX object. This
includes all SDOs, SCOs, meta-objects, and bundle. If value is a type or
ID, this just checks whether the type was registered in the given STIX
version. If a mapping, *simple* STIX version inference is additionally
done on the value, and the result is checked against stix_version. It does
not attempt to fully validate the value.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:return: True if the type of the given value is a valid STIX type with
respect to the given STIX version; False if not
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
cls_maps = mappings.STIX2_OBJ_MAPS[stix_version]
type_ = _stix_type_of(value)
result = type_ in cls_maps["observables"] \
or type_ in cls_maps["objects"]
return result
def is_marking(value, stix_version=stix2.version.DEFAULT_VERSION):
"""
Determine whether the given object, type, or ID is/is for an marking
definition of the given STIX version. If value is a type or ID, this just
checks whether the type is "marking-definition". If a mapping, *simple*
STIX version inference is additionally done on the value, and the result
is checked against stix_version. It does not attempt to fully validate the
value.
:param value: A STIX object, object ID, or type as a string.
:param stix_version: A STIX version as a string
:return: True if the value is/is for a marking definition, False otherwise.
"""
result = True
if isinstance(value, collections.abc.Mapping):
value_stix_version = detect_spec_version(value)
if value_stix_version != stix_version:
result = False
if result:
# No need to check registration in this case
type_ = _stix_type_of(value)
result = type_ == "marking-definition"
return result
class STIXTypeClass(enum.Enum):
"""
Represents different classes of STIX type.
"""
SDO = 0
SCO = 1
SRO = 2
def is_stix_type(value, stix_version=stix2.version.DEFAULT_VERSION, *types):
"""
Determine whether the type of the given value satisfies the given
constraints. 'types' must contain STIX types as strings, and/or the
STIXTypeClass enum values. STIX types imply an exact match constraint;
STIXTypeClass enum values imply a more general constraint, that the object
or type be in that class of STIX type. These constraints are implicitly
OR'd together.
:param value: A mapping with a "type" property, or a STIX ID or type
as a string
:param stix_version: A STIX version as a string
:param types: A sequence of STIX type strings or STIXTypeClass enum values
:return: True if the object or type satisfies the constraints; False if not
"""
for type_ in types:
if type_ is STIXTypeClass.SDO:
result = is_sdo(value, stix_version)
elif type_ is STIXTypeClass.SCO:
result = is_sco(value, stix_version)
elif type_ is STIXTypeClass.SRO:
result = is_sro(value, stix_version)
else:
# Assume a string STIX type is given instead of a class enum,
# and just check for exact match.
obj_type = _stix_type_of(value)
result = obj_type == type_ and is_object(value, stix_version)
if result:
break
else:
result = False
return result