-
Notifications
You must be signed in to change notification settings - Fork 164
/
Copy pathasyncproc.py
216 lines (168 loc) · 6.01 KB
/
asyncproc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
import json
import boto3
import os
from helper import AwsHelper
import time
def startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables):
print("Starting job with documentId: {}, bucketName: {}, objectName: {}".format(documentId, bucketName, objectName))
response = None
client = AwsHelper().getClient('textract')
if(not detectForms and not detectTables):
response = client.start_document_text_detection(
ClientRequestToken = documentId,
DocumentLocation={
'S3Object': {
'Bucket': bucketName,
'Name': objectName
}
},
NotificationChannel= {
"RoleArn": snsRole,
"SNSTopicArn": snsTopic
},
JobTag = documentId)
else:
features = []
if(detectTables):
features.append("TABLES")
if(detectForms):
features.append("FORMS")
response = client.start_document_analysis(
ClientRequestToken = documentId,
DocumentLocation={
'S3Object': {
'Bucket': bucketName,
'Name': objectName
}
},
FeatureTypes=features,
NotificationChannel= {
"RoleArn": snsRole,
"SNSTopicArn": snsTopic
},
JobTag = documentId)
return response["JobId"]
def processItem(message, snsTopic, snsRole):
print('message:')
print(message)
messageBody = json.loads(message['Body'])
bucketName = messageBody['bucketName']
objectName = messageBody['objectName']
documentId = messageBody['documentId']
features = messageBody['features']
print('Bucket Name: ' + bucketName)
print('Object Name: ' + objectName)
print('Task ID: ' + documentId)
print("API: {}".format(features))
print('starting Textract job...')
detectForms = 'Forms' in features
detectTables = 'Tables' in features
jobId = startJob(bucketName, objectName, documentId, snsTopic, snsRole, detectForms, detectTables)
if(jobId):
print("Started Job with Id: {}".format(jobId))
return jobId
def changeVisibility(sqs, qUrl, receipt_handle):
try:
sqs.change_message_visibility(
QueueUrl=qUrl,
ReceiptHandle=receipt_handle,
VisibilityTimeout=0
)
except Exception as e:
print("Failed to change visibility for {} with error: {}".format(receipt_handle, e))
def getMessagesFromQueue(sqs, qUrl,):
# Receive message from SQS queue
response = sqs.receive_message(
QueueUrl=qUrl,
MaxNumberOfMessages=1,
VisibilityTimeout=60 #14400
)
print('SQS Response Recieved:')
print(response)
if('Messages' in response):
return response['Messages']
else:
print("No messages in queue.")
return None
def processItems(qUrl, snsTopic, snsRole):
sqs = AwsHelper().getClient('sqs')
messages = getMessagesFromQueue(sqs, qUrl)
jc = 0
totalMessages = 0
hitLimit = False
limitException = None
if(messages):
totalMessages = len(messages)
print("Total messages: {}".format(totalMessages))
for message in messages:
receipt_handle = message['ReceiptHandle']
try:
if(hitLimit):
changeVisibility(sqs, qUrl, receipt_handle)
else:
print("starting job...")
processItem(message, snsTopic, snsRole)
print("started job...")
print('Deleting item from queue...')
# Delete received message from queue
sqs.delete_message(
QueueUrl=qUrl,
ReceiptHandle=receipt_handle
)
print('Deleted item from queue...')
jc += 1
except Exception as e:
print("Error while starting job or deleting from queue: {}".format(e))
changeVisibility(sqs, qUrl, receipt_handle)
if(e.__class__.__name__ == 'LimitExceededException'
or e.__class__.__name__ == "ProvisionedThroughputExceededException"):
hitLimit = True
limitException = e
if(hitLimit):
raise limitException
return totalMessages, jc
def processRequest(request):
qUrl = request['qUrl']
snsTopic = request['snsTopic']
snsRole = request['snsRole']
i = 0
max = 100
totalJobsScheduled = 0
hitLimit = False
provisionedThroughputExceededCount = 0
while(i < max):
try:
tc, jc = processItems(qUrl, snsTopic, snsRole)
totalJobsScheduled += jc
if(tc == 0):
i = max
except Exception as e:
if(e.__class__.__name__ == 'LimitExceededException'):
print("Exception: Hit limit.")
hitLimit = True
i = max
elif(e.__class__.__name__ == "ProvisionedThroughputExceededException"):
print("ProvisionedThroughputExceededException.")
provisionedThroughputExceededCount += 1
if(provisionedThroughputExceededCount > 5):
i = max
else:
print("Waiting for few seconds...")
time.sleep(5)
print("Waking up...")
i += 1
output = "Started {} jobs.".format(totalJobsScheduled)
if(hitLimit):
output += " Hit limit."
print(output)
return {
'statusCode': 200,
'body': output
}
def lambda_handler(event, context):
print("event: {}".format(event))
request = {}
request["qUrl"] = os.environ['ASYNC_QUEUE_URL']
request["snsTopic"] = os.environ['SNS_TOPIC_ARN']
request["snsRole"] = os.environ['SNS_ROLE_ARN']
return processRequest(request)