Skip to content
This repository has been archived by the owner on Jun 17, 2023. It is now read-only.

Commit

Permalink
feature: add upsert support for ES (#479)
Browse files Browse the repository at this point in the history
* add es upsert support

* add es upsert unit tests

* add upsert support to dev helpers

* update travis file to run upsert tests

* fix upsert support in travis file
  • Loading branch information
ckrez authored and wesyoung committed Nov 8, 2019
1 parent 2386366 commit c269923
Show file tree
Hide file tree
Showing 10 changed files with 495 additions and 95 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ python:
- 3.6

env:
- CIF_ELASTICSEARCH_TEST=1
- CIF_ELASTICSEARCH_TEST=1 CIF_STORE_ES_UPSERT_MODE=0
- CIF_ELASTICSEARCH_TEST=1 CIF_STORE_ES_UPSERT_MODE=1

before_install:
- docker pull elasticsearch:5
Expand Down
2 changes: 2 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ redhat=0
rhel_user=ENV['RHEL_USER']
rhel_pass=ENV['RHEL_PASSWORD']
es_tests=ENV.fetch('CIF_ELASTICSEARCH_TEST', '0')
es_upsert=ENV.fetch('CIF_STORE_ES_UPSERT_MODE', '0')

redhat=1 if distro == 'redhat'

Expand All @@ -31,6 +32,7 @@ $script = <<SCRIPT
export CIF_ANSIBLE_SDIST=#{sdist}
export CIF_ANSIBLE_ES=#{es}
export CIF_ELASTICSEARCH_TEST=#{es_tests}
export CIF_STORE_ES_UPSERT_MODE=#{es_upsert}
export CIF_HUNTER_THREADS=#{hunter_threads}
export CIF_HUNTER_ADVANCED=#{hunter_advanced}
export CIF_GATHERER_GEO_FQDN=#{geo_fqdn}
Expand Down
18 changes: 18 additions & 0 deletions cif/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,15 @@ def _flush_create_queue(self):
except (TypeError, binascii.Error) as e:
pass

if not i.get('lasttime'):
i['lasttime'] = arrow.utcnow().datetime.replace(tzinfo=None)

if not i.get('firsttime'):
i['firsttime'] = i['lasttime']

if not i.get('reporttime'):
i['reporttime'] = i['lasttime']

n = self.store.indicators.upsert(_t, data)

t_time = time.time() - start_time
Expand Down Expand Up @@ -313,6 +322,15 @@ def handle_indicators_create(self, token, data, id=None, client_id=None, flush=F
except (TypeError, binascii.Error) as e:
pass

if not i.get('lasttime'):
i['lasttime'] = arrow.utcnow().datetime.replace(tzinfo=None)

if not i.get('firsttime'):
i['firsttime'] = i['lasttime']

if not i.get('reporttime'):
i['reporttime'] = i['lasttime']

n = self.store.indicators.upsert(t, data, flush=flush)

t = time.time() - start_time
Expand Down
9 changes: 9 additions & 0 deletions cif/store/zelasticsearch/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,21 @@
TIMEOUT = os.getenv('CIF_ES_TIMEOUT', TIMEOUT)
TIMEOUT = '{}s'.format(TIMEOUT)

REQUEST_TIMEOUT = 60
REQUEST_TIMEOUT = os.getenv('CIF_ES_REQ_TIMEOUT', REQUEST_TIMEOUT)

UPSERT_MODE = os.getenv('CIF_STORE_ES_UPSERT_MODE', False)
if UPSERT_MODE == '1':
UPSERT_MODE = True
else:
UPSERT_MODE = False

PARTITION = os.getenv('CIF_STORE_ES_PARTITION', 'month')

DELETE_FILTERS = os.getenv('CIF_STORE_ES_DELETE_FILTERS', 'id, indicator, provider')
DELETE_FILTERS = DELETE_FILTERS.split(',')
DELETE_FILTERS = list(set((x.strip() for x in DELETE_FILTERS)))

UPSERT_MATCH = os.getenv('CIF_STORE_ES_UPSERT_MATCH', 'indicator, provider, confidence, tags, group, tlp, rdata')
UPSERT_MATCH = UPSERT_MATCH.split(',')
UPSERT_MATCH = set((x.strip() for x in UPSERT_MATCH))
Loading

0 comments on commit c269923

Please sign in to comment.