Skip to content

Commit

Permalink
[worker] Acknowledge message after all operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Hassine committed Oct 31, 2019
1 parent b207d77 commit bd1935c
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions opencti-worker/src/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,21 @@ def data_handler(self, connection, channel, delivery_tag, data):
content = base64.b64decode(data['content']).decode('utf-8')
types = data['entities_types'] if 'entities_types' in data else []
imported_data = self.api.stix2_import_bundle(content, True, types)
cb = functools.partial(self.ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)

if job_id is not None:
messages = []
by_types = groupby(imported_data, key=lambda x: x['type'])
for key, grp in by_types:
messages.append(str(len(list(grp))) + ' imported ' + key)
self.api.job.update_job(job_id, 'complete', messages)
cb = functools.partial(self.ack_message, channel, delivery_tag)
connection.add_callback_threadsafe(cb)
return True
except RequestException as re:
logging.error('A connection error occurred: { ' + str(re) + ' }')
logging.info('Message (delivery_tag=' + str(delivery_tag) + ') NOT acknowledged')
cb = functools.partial(self.stop_consume, channel)
connection.add_callback_threadsafe(cb)
return False
except Exception as e:
logging.error('An unexpected error occurred: { ' + str(e) + ' }')
cb = functools.partial(self.ack_message, channel, delivery_tag)
Expand Down

0 comments on commit bd1935c

Please sign in to comment.