diff --git a/opencti-worker/src/worker.py b/opencti-worker/src/worker.py index 2678248237cc..5d5578a979ef 100644 --- a/opencti-worker/src/worker.py +++ b/opencti-worker/src/worker.py @@ -70,20 +70,21 @@ def data_handler(self, connection, channel, delivery_tag, data): content = base64.b64decode(data['content']).decode('utf-8') types = data['entities_types'] if 'entities_types' in data else [] imported_data = self.api.stix2_import_bundle(content, True, types) - cb = functools.partial(self.ack_message, channel, delivery_tag) - connection.add_callback_threadsafe(cb) - if job_id is not None: messages = [] by_types = groupby(imported_data, key=lambda x: x['type']) for key, grp in by_types: messages.append(str(len(list(grp))) + ' imported ' + key) self.api.job.update_job(job_id, 'complete', messages) + cb = functools.partial(self.ack_message, channel, delivery_tag) + connection.add_callback_threadsafe(cb) + return True except RequestException as re: logging.error('A connection error occurred: { ' + str(re) + ' }') logging.info('Message (delivery_tag=' + str(delivery_tag) + ') NOT acknowledged') cb = functools.partial(self.stop_consume, channel) connection.add_callback_threadsafe(cb) + return False except Exception as e: logging.error('An unexpected error occurred: { ' + str(e) + ' }') cb = functools.partial(self.ack_message, channel, delivery_tag)