Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQSServiceSensor for non-polling SQS sensor #91

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions sensors/sqs_service_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
"""
This is generic SQS Sensor using boto3 api to fetch messages from sqs queue.
After receiving a message it's content is passed as payload to a trigger 'aws.sqs_new_message'
This sensor can be configured either by using config.yaml within a pack or by creating
following values in datastore:
- aws.input_queues (list queues as comma separated string: first_queue,second_queue)
- aws.aws_access_key_id
- aws.aws_secret_access_key
- aws.region
- aws.max_number_of_messages (must be between 1 - 10)
For configuration in config.yaml with config like this
setup:
aws_access_key_id:
aws_access_key_id:
region:
sqs_sensor:
input_queues:
- first_queue
- second_queue
sqs_other:
max_number_of_messages: 1
If any value exist in datastore it will be taken instead of any value in config.yaml
"""

import six
import json
from boto3.session import Session
from botocore.exceptions import ClientError
from botocore.exceptions import NoRegionError
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import EndpointConnectionError

from st2reactor.sensor.base import Sensor


class AWSSQSServiceSensor(Sensor):
def __init__(self, sensor_service, config=None):
super(AWSSQSServiceSensor, self).__init__(sensor_service=sensor_service, config=config)

def setup(self):
self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)

self.session = None
self.sqs_res = None

def run(self):
# setting SQS ServiceResource object from the parameter of datastore or configuration file
self._may_setup_sqs()

while True:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is no yielding needed in this function (aka eventlet.sleep(0.01) at the end or similar) because _receive_messages performs a network operation which already needs to yield at some point even if there are no messages to be retrieved.

Otherwise if that's not the case and _receive_messages could immediately return this could cause CPU spikes and 100% CPU utilization by the sensor process since there is no yielding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boto3 receive_messages accepts a WaitTimeSeconds argument, which _receive_messages defaults to 2 seconds. That's what's keeping the loop from spinning too fast.

for queue in self.input_queues:
msgs = self._receive_messages(queue=self._get_queue_by_name(queue),
num_messages=self.max_number_of_messages)
for msg in msgs:
if msg:
payload = {"queue": queue, "body": json.loads(msg.body)}
self._sensor_service.dispatch(trigger="aws.sqs_new_message",
payload=payload)
msg.delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this method throw?

If so, it probably wouldn't be a bad idea to wrap it in try / catch to avoid a scenario where the same message would always throw for some reason which would prevent sensor from continuing the processing since it would always crash on exception...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this throws an exception, it's not because SQS couldn't delete the message due to some unrecoverable error. It will be because of a misconfiguration on the client side or something like that. I don't think it is necessary to try/catch this.


def cleanup(self):
pass

def add_trigger(self, trigger):
# This method is called when trigger is created
pass

def update_trigger(self, trigger):
# This method is called when trigger is updated
pass

def remove_trigger(self, trigger):
pass

def _get_config_entry(self, key, prefix=None):
''' Get configuration values either from Datastore or config file. '''
config = self.config
if prefix:
config = self._config.get(prefix, {})

value = self._sensor_service.get_value('aws.%s' % (key), local=False)
if not value:
value = config.get(key, None)

if not value and config.get('setup', None):
value = config['setup'].get(key, None)

return value

def _may_setup_sqs(self):
queues = self._get_config_entry(key='input_queues', prefix='sqs_sensor')

# XXX: This is a hack as from datastore we can only receive a string while
# from config.yaml we can receive a list
if isinstance(queues, six.string_types):
self.input_queues = [x.strip() for x in queues.split(',')]
elif isinstance(queues, list):
self.input_queues = queues
else:
self.input_queues = []

self.aws_access_key = self._get_config_entry('aws_access_key_id')
self.aws_secret_key = self._get_config_entry('aws_secret_access_key')
self.aws_region = self._get_config_entry('region')

self.max_number_of_messages = self._get_config_entry('max_number_of_messages',
prefix='sqs_other')

# checker configuration is update, or not
def _is_same_credentials():
c = self.session.get_credentials()
return c is not None and \
c.access_key == self.aws_access_key and \
c.secret_key == self.aws_secret_key and \
self.session.region_name == self.aws_region

if self.session is None or not _is_same_credentials():
self._setup_sqs()

def _setup_sqs(self):
''' Setup Boto3 structures '''
self._logger.debug('Setting up SQS resources')
self.session = Session(aws_access_key_id=self.aws_access_key,
aws_secret_access_key=self.aws_secret_key,
region_name=self.aws_region)

try:
self.sqs_res = self.session.resource('sqs')
except NoRegionError:
self._logger.warning("The specified region '%s' is invalid", self.aws_region)

def _get_queue_by_name(self, queueName):
''' Fetch QUEUE by it's name create new one if queue doesn't exist '''
try:
return self.sqs_res.get_queue_by_name(QueueName=queueName)
except ClientError as e:
if e.response['Error']['Code'] == 'AWS.SimpleQueueService.NonExistentQueue':
self._logger.warning("SQS Queue: %s doesn't exist, creating it.", queueName)
return self.sqs_res.create_queue(QueueName=queueName)
elif e.response['Error']['Code'] == 'InvalidClientTokenId':
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also throw (and abort sensor processing) on this error?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this was logged and not raised. It shouldn't be possible to get here if you have invalid credentials. We tested that scenario, and it raises an exception. I can't recall if it did so when getting the resource or creating the session.

else:
raise
except NoCredentialsError as e:
self._logger.warning("Cloudn't operate sqs because of invalid credential config")
except EndpointConnectionError as e:
self._logger.warning(e)

def _receive_messages(self, queue, num_messages, wait_time=2):
''' Receive a message from queue and return it. '''
if queue:
return queue.receive_messages(WaitTimeSeconds=wait_time,
MaxNumberOfMessages=num_messages)
else:
return []
15 changes: 15 additions & 0 deletions sensors/sqs_service_sensor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
class_name: "AWSSQSServiceSensor"
entry_point: "sqs_service_sensor.py"
description: "Service Sensor which monitors a SQS queue for new messages"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would also be good to document here in the description and also in README how this sensor differentiates from other one :)

trigger_types:
-
name: "sqs_new_message"
description: "Trigger which indicates that a new message has arrived"
payload_schema:
type: "object"
properties:
queue:
type: "string"
body:
type: "object"