-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp_tier.js
152 lines (129 loc) · 4.65 KB
/
app_tier.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
const AWS = require('aws-sdk');
const fs = require('fs');
const { execSync } = require('child_process');
const config = require('./config.json');
const aws_access_key_id = config.aws_access_key_id;
const aws_secret_access_key = config.aws_secret_access_key;
const region = config.region;
const sqs_request_url = config.sqs_request_url;
const sqs_response_url = config.sqs_response_url;
const s3_input_bucket = config.s3_input_bucket;
const s3_output_bucket = config.s3_output_bucket;
const input_path = config.input_path;
// Configure AWS credentials and region
AWS.config.update({
accessKeyId: aws_access_key_id,
secretAccessKey: aws_secret_access_key,
region: region
});
const sqs = new AWS.SQS();
const s3 = new AWS.S3();
const SQS_REQUEST_URL = sqs_request_url;
const SQS_RESPONSE_URL = sqs_response_url;
const S3_INPUT_BUCKET = s3_input_bucket;
const S3_OUTPUT_BUCKET = s3_output_bucket;
const INPUT_PATH = input_path;
// Function to read requests from SQS and process them
const readRequests = async () => {
try {
console.log("1");
const sqsResponse = await sqs.receiveMessage({
QueueUrl: SQS_REQUEST_URL,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20,
VisibilityTimeout: 30
}).promise();
if (!sqsResponse.Messages) {
// If no messages, re-invoke readRequests after a delay
setTimeout(readRequests, 5000); // Wait 5 seconds before re-invoking
return;
}
const sqsMessage = sqsResponse.Messages[0];
console.log("sqsResponse -> ", sqsResponse);
console.log("SQS Msg -> ", sqsMessage);
if(sqsMessage){
const receiptHandle = sqsMessage.ReceiptHandle;
console.log(`Message Received: ${sqsMessage.Body}`);
const imageBody = sqsMessage.Body;
await downloadImageFromS3(imageBody.split("/").pop());
// Delete received message from SQS
await sqs.deleteMessage({
QueueUrl: SQS_REQUEST_URL,
ReceiptHandle: receiptHandle
}).promise();
}
// After processing current message, re-invoke readRequests after a delay
setTimeout(readRequests, 0); // Immediately re-invoke
} catch (error) {
console.error(`An error occurred while processing requests: ${error}`);
}
};
// Function to download image from S3
const downloadImageFromS3 = async (imageName) => {
try {
console.log("2");
if (!fs.existsSync(INPUT_PATH)) {
fs.mkdirSync(INPUT_PATH);
}
console.log(`Downloading image: ${imageName}`);
const s3Params = {
Bucket: S3_INPUT_BUCKET,
Key: imageName
};
console.log("s3Params", s3Params);
const data = await s3.getObject(s3Params).promise();
console.log("S3.getobject Data ->", data);
fs.writeFileSync(INPUT_PATH + imageName, data.Body);
const response = await classifyImage(INPUT_PATH + imageName, imageName);
console.log("Classify Image response ->", response);
} catch (error) {
console.error(`An error occurred while downloading image: ${error}`);
}
};
// Function to classify image
const classifyImage = async (pathToFile, imageName) => {
try {
console.log("3");
const classifierPath = '/home/ubuntu/cloud_computing_project/model/face_recognition.py';
const imageNameWithoutExtension = imageName.split('.')[0]; // Remove the file extension
const modelPrediction = execSync(`python3 ${classifierPath} ${pathToFile}`).toString();
const result = modelPrediction.trim();
console.log(`Classification results: ${result}`);
const respo = await saveResultInS3Output(imageName, result);
console.log("saveResultInS3Output response -> ", respo);
await sendResultToSqsResponse(imageName, result);
} catch (error) {
console.error(`An error occurred while classifying image: ${error}`);
}
};
// Function to save result in S3 output
const saveResultInS3Output = async (key, value) => {
try {
console.log("4");
const result = `(${key},${value})`;
console.log("result ->", result);
await s3.putObject({
Bucket: S3_OUTPUT_BUCKET,
Key: key.split('.')[0],
Body: result
}).promise();
} catch (error) {
console.error(`An error occurred while saving the classification result: ${error}`);
}
};
// Function to send result to SQS response
const sendResultToSqsResponse = async (key, value) => {
try {
console.log("5");
const sqsMessage = `${key},${value}`;
const sqsResponse = await sqs.sendMessage({
QueueUrl: SQS_RESPONSE_URL,
MessageBody: sqsMessage
}).promise();
console.log(`Message sent to response queue. MessageId: ${sqsResponse.MessageId}`);
} catch (error) {
console.error(`An error occurred while sending the response: ${error}`);
}
};
// Start reading requests
readRequests();