-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
69cda72
commit 915da5e
Showing
9 changed files
with
117 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
import type { Request, Response } from "express"; | ||
import { errorWrapper } from "@bcgov/citz-imb-express-utilities"; | ||
import { addToQueue } from "../queue"; | ||
|
||
// Adds to test queue which processes one request per 10 seconds. | ||
export const addToRabbitQueue = errorWrapper(async (req: Request, res: Response) => { | ||
try { | ||
const jobID = `job-${Date.now()}`; | ||
|
||
// Add the job ID to the RabbitMQ queue | ||
await addToQueue(jobID); | ||
|
||
// Respond with job ID | ||
res | ||
.status(200) | ||
.json({ success: true, message: "Queue will process a job every 10 seconds.", jobID }); | ||
} catch (error) { | ||
res.status(500).json({ | ||
success: false, | ||
error: | ||
error instanceof Error | ||
? error.message | ||
: "An unknown error occurred during addToRabbitQueue.", | ||
}); | ||
} | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export * from "./addToRabbitQueue"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
export { default as router } from "./router"; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import amqp from "amqplib"; | ||
import { ENV } from "src/config"; | ||
|
||
const { RABBITMQ_URL } = ENV; | ||
const QUEUE_NAME = "test_queue"; | ||
|
||
// Connect to RabbitMQ and create a channel | ||
const connectToRabbitMQ = async () => { | ||
try { | ||
if (!RABBITMQ_URL) throw new Error("RABBITMQ_URL env variable is undefined."); | ||
const connection = await amqp.connect(RABBITMQ_URL); | ||
const channel = await connection.createChannel(); | ||
await channel.assertQueue(QUEUE_NAME, { durable: false }); | ||
return channel; | ||
} catch (error) { | ||
console.error("Failed to connect to RabbitMQ:", error); | ||
throw error; | ||
} | ||
}; | ||
|
||
// Create a channel instance for reuse | ||
let channelPromise: Promise<amqp.Channel>; | ||
|
||
// Get or create a channel | ||
export const getChannel = async () => { | ||
if (!channelPromise) { | ||
channelPromise = connectToRabbitMQ(); | ||
} | ||
return channelPromise; | ||
}; | ||
|
||
// Add a message to the queue | ||
export const addToQueue = async (message: string) => { | ||
const channel = await getChannel(); | ||
channel.sendToQueue(QUEUE_NAME, Buffer.from(message)); | ||
}; | ||
|
||
// Start consuming messages from the queue | ||
export const startQueueConsumer = async () => { | ||
try { | ||
const channel = await getChannel(); | ||
channel.prefetch(1); // Only process one message at a time | ||
channel.consume( | ||
QUEUE_NAME, | ||
(msg) => { | ||
if (msg) { | ||
const jobID = msg.content.toString(); | ||
console.log(`Processed job: ${jobID}`); | ||
setTimeout(() => channel.ack(msg), 10 * 1000); | ||
} | ||
}, | ||
{ noAck: false }, | ||
); | ||
} catch (error) { | ||
console.error("Failed to consume messages from RabbitMQ:", error); | ||
} | ||
}; | ||
|
||
// Start the consumer immediately | ||
startQueueConsumer(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
import { Router } from "express"; | ||
import { addToRabbitQueue } from "./controllers"; | ||
|
||
const router = Router(); | ||
|
||
router.post("/addToQueue", addToRabbitQueue); | ||
|
||
export default router; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters