Skip to content

Commit

Permalink
mws-3953 -- bulk api with subscribers and topic details
Browse files Browse the repository at this point in the history
  • Loading branch information
shiva-sc committed Feb 12, 2025
1 parent 886be60 commit a55f2ee
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 34 deletions.
13 changes: 10 additions & 3 deletions .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ password=password

keySalt=salt
validHosts=["localhost:8080"]

BASE_URL="http://localhost:8080"

# Setting require to enable 50k more download
CDS_NOTIFY_END_POINT=
Expand All @@ -40,10 +40,17 @@ AWS_ACCESS_KEY=
AWS_SECRET_ACCESS_KEY=
AWS_BUCKET=

# used by bulk api, make sure has a space at end
BULK_GC_NOTIFY_PREPEND="ApiKey-v1 "

#bulk queue options
BULK_Q_TYPE=exponential
BULK_Q_DELAY=300000
BULK_Q_ATTEMPTS=20

# REDIS
REDIS_ENV=stage
REDIS_URI=x-notify-redis
REDIS_URI=notify-redis-1
REDIS_PORT=6379
REDIS_SENTINEL_1_URI=127.0.0.1
REDIS_SENTINEL_1_PORT=26379
Expand All @@ -55,4 +62,4 @@ REDIS_MASTER_NAME=x-notify-master
transport='{"host":"","port":"587","secure": true,"auth":{"user":"","pass":""}}'

# Prevent TLS error
NODE_TLS_REJECT_UNAUTHORIZED='0'
NODE_TLS_REJECT_UNAUTHORIZED='0'
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ COPY package*.json ./

RUN npm install -g nodemon
RUN npm install
RUN export NODE_OPTIONS=--max_old_space_size=4096 #4GB

COPY . .

WORKDIR ./x-notify/

COPY ./.env-example ./.env

WORKDIR ./

CMD [ "npm", "start" ]
121 changes: 121 additions & 0 deletions controllers/bulkApiMailer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
const fetch = require('node-fetch');
const mailingManager = require('./mailing');
const mailSend = require("../helpers/mailSend");
const { bulkQueue } = require("../notifyQueue");

const BASE_URL = process.env.BASE_URL || "https://apps.canada.ca/x-notify";
const bulkAPI = "https://api.notification.canada.ca/v2/notifications/bulk";
const BULK_GC_NOTIFY_PREPEND = process.env.BULK_GC_NOTIFY_PREPEND || "ApiKey-v1 ";
const BULK_Q_ATTEMPTS = parseInt(process.env.BULK_Q_ATTEMPTS) || 20;
const BULK_Q_TYPE = process.env.BULK_Q_TYPE || "exponential";
const BULK_Q_DELAY = parseInt(process.env.BULK_Q_DELAY) || 300000; // 5 min

// Process jobs
bulkQueue.process(async (job) => {
try {
let mailingState = mailingManager.mailingState;
let jobData = job.data;

// Making the Bulk API POST request
let response = await fetch(bulkAPI, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
"Authorization" : BULK_GC_NOTIFY_PREPEND + jobData.notifyKey
},
body: JSON.stringify( jobData.bulkEmailBody ),
}).then(response => {
if (!response.ok) {
throw new Error(`HTTP Error Status: ${response.status}`);
}
return response.json();
})
.then( result => {
mailingManager.mailingUpdate( jobData.mailingId, mailingState.sent, { historyState: mailingState.sending } );
})

} catch (error) {
if (error.message.includes('HTTP Error Status: 5')) {
throw new Error('Retryable error'); // Ensures Bull retries
}
}
});

// Listen for failures
bulkQueue.on('failed', (job, err) => {
console.error(` bulkQueue Job ${job.id} failed: ${err.message}`);
});

exports.sendBulkEmails = async ( mailingId, topicId, subject, mailingBody ) => {
try {
let mailing_name = "Bulk_email-" + topicId;
let mailingTopic = await mailingManager.getTopic( topicId );

if ( !mailingTopic ) {
console.log( " Bulkmailer -- sendBulkEmails: no mailingTopic found with: " + topicId);
throw new Error( "Bulkmailer sendBulkEmails: no mailingTopic found with: " + topicId );
}

if ( !mailingTopic.nTemplateMailingId || !mailingTopic.notifyKey ) {
console.log( " Bulkmailer -- sendBulkEmails: check mailingTopic details with topicId: " + topicId );
throw new Error( "Bulkmailer -- sendBulkEmails: check mailingTopic details with topicId: " + topicId );
}

let subscribers = await mailSend.getConfirmedSubscriberAsArray( topicId );
if ( !subscribers.length) {
console.log( " Bulkmailer -- sendBulkEmails : No subscribers found for the topic: " + topicId );
throw new Error( "Bulkmailer -- sendBulkEmails: No subscribers found for the topic: " + topicId );
}

let formattedSubsArray = await formatSubsArray( subscribers, mailingBody, subject);
let bulkEmailBody = {
"name": mailing_name,
"template_id": mailingTopic.nTemplateMailingId,
"rows": formattedSubsArray
}

bulkQueue.add(
{
bulkEmailBody: bulkEmailBody,
notifyKey: mailingTopic.notifyKey,
mailingId: mailingId,
},
{
attempts: BULK_Q_ATTEMPTS, // Maximum number of retries
backoff: {
type: 'BULK_Q_TYPE', // Use exponential backoff or fixed
delay: BULK_Q_DELAY // Initial delay of 1 second (doubles each retry)
}
}
);
} catch (err) {
console.log( 'sendBulkEmails error: ')
console.log( err )
throw Error('sendBulkEmails error: ' + err, 500)
}
}

formatSubsArray = async ( listEmail, mailingBody, subject) => {

let i, i_len = listEmail.length, subscriber;
let subsArray = [
["subject", "email address", "body", "unsub_link"]
]
for( i = 0; i !== i_len; i++) {
subscriber = listEmail[ i ];

const { email, _id } = subscriber;

const userCodeUrl = ( _id ? _id.toHexString() : _id );

if ( !email || !userCodeUrl ) {
continue;
}

let unsub_link = BASE_URL + "/subs/remove/" + userCodeUrl
subsArray.push( [subject, email, mailingBody, unsub_link] )
}

return subsArray

}
38 changes: 27 additions & 11 deletions controllers/mailing.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const dbConn = module.parent.parent.exports.dbConn;
const ObjectId = require('mongodb').ObjectId;

const { Worker } = require('worker_threads');
const bulkApiMailer = require('./bulkApiMailer');

const _mailingState = {
cancelled: "cancelled",
Expand Down Expand Up @@ -232,20 +233,33 @@ exports.mailingCancelSendToSub = async ( mailingId ) => {
}

exports.mailingSendToSub = async ( mailingId ) => {
// Need to be in current state "approved"
let mailing = await dbConn.collection( "mailing" ).findOne( { _id: ObjectId( mailingId ) } );
if ( !mailing ) {
console.log( "mailingSendToSub: Invalid mailing id: " + mailingId );
throw new Error( "mailingSendToSub: Mailing unavailable" );
}

const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } );

let topic = await getTopic( mailing.topicId );
if ( !topic ) {
console.log( "mailingSendToSub: no topic: " + mailing.topicId );
throw Error( "mailingSendToSub : no topic with topicId: " + mailing.topicId);
}

// Need to be in current state "approved"
const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } );
// Check if the operation was successful, if not we know the error is already logged
if ( !rDoc ) {
return true;
}

// Do the sending
sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );



//if the bulkMail flag is set emails are delivered using bulk api
if ( topic.bulkMail ) {
bulkApiMailer.sendBulkEmails( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );
} else {
// Do the sending
sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );
}

// When completed, change state to "sent"

}
Expand Down Expand Up @@ -389,7 +403,7 @@ async function mailingUpdate( mailingId, newHistoryState, options ) {
// Send the mailing to the "approval email list"
return rDoc.value;
}

exports.mailingUpdate = mailingUpdate;

// Simple worker to send mailing
async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) {
Expand Down Expand Up @@ -492,7 +506,8 @@ getTopic = ( topicId ) => {
unsubURL: 1,
thankURL: 1,
failURL: 1,
inputErrURL: 1
inputErrURL: 1,
bulkMail: 1,
}
} ).catch( (e) => {
console.log( "getTopic" );
Expand All @@ -512,4 +527,5 @@ getTopic = ( topicId ) => {

return topic;

}
}
exports.getTopic = getTopic
2 changes: 1 addition & 1 deletion controllers/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const processEnv = process.env,
_subsLinkSuffix = processEnv.subsLinkSuffix || "853e0212b92a127";


const redisUri = process.env.REDIS_URI || 'x-notify-redis';
const redisUri = process.env.REDIS_URI || 'notify-redis-1';
const redisPort = process.env.REDIS_PORT || '6379';
const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1';
const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379';
Expand Down
2 changes: 2 additions & 0 deletions controllers/workerSendEmail.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,5 @@ getConfirmedSubscriberAsArray = async ( topicId ) => {

return docsItems;
};

exports.getConfirmedSubscriberAsArray = getConfirmedSubscriberAsArray;
36 changes: 23 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
version: '3.8'
services:
mongo:
notify-mongo-1:
image: mongo:4.2
container_name: x-notify-mongo
container_name: notify-mongo-1
ports:
- target: 27017
published: 27016
protocol: tcp
mode: host
networks:
- x-notify-net
redis:
- notify-net-1
volumes:
- mongo_data:/data/db # Persistent volume for MongoDB
- mongo_config:/data/configdb # Configuration volume

notify-redis-1:
image: redis:6.0.1
container_name: x-notify-redis
container_name: notify-redis-1
ports:
- "6379:6379"
networks:
- x-notify-net
x-notify:
- notify-net-1
notify-node-1:
build: ./
container_name: x-notify
container_name: notify-node-1
ports:
- "8080:8080"
restart: on-failure
environment:
- MONGODB_URI=mongodb://mongo:27017/test
- MONGODB_URI=mongodb://notify-mongo-1
- NODE_ENV=development
env_file: ".env"
depends_on:
- mongo
- notify-mongo-1
- notify-redis-1
volumes:
- .:/x-notify
- .:/notify-1
- /node_modules
networks:
- x-notify-net
- notify-net-1
volumes:
mongo_data:
mongo_config:

networks:
x-notify-net:
notify-net-1:
driver: bridge
7 changes: 6 additions & 1 deletion notifyQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const redisUri = process.env.REDIS_URI || 'x-notify-redis';
const redisUri = process.env.REDIS_URI || 'notify-redis-1';
const redisPort = process.env.REDIS_PORT || '6379';
const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1';
const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379';
Expand Down Expand Up @@ -35,11 +35,16 @@ if (process.env.NODE_ENV === 'prod') {

const notifyQueue = new Queue('sendMail', redisConf);

//used by bulk Api manager
const bulkQueue = new Queue('bulk-api', redisConf);
exports.bulkQueue = bulkQueue;

const serverAdapter = new ExpressAdapter();

createBullBoard({
queues: [
new BullAdapter( notifyQueue ),
new BullAdapter( bulkQueue ),
],
serverAdapter
})
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"author": "Government of Canada",
"contributors": [
"duboisp",
"GormFrank"
"GormFrank",
"shiva-sc"
],
"scripts": {
"start": "nodemon -L server.js",
Expand All @@ -37,6 +38,7 @@
"mustache": "^4.0.1",
"nodemailer": "^6.4.6",
"notifications-node-client": "^4.8.0",
"node-fetch": "^2.7.0",
"passport": "^0.4.1",
"passport-http": "^0.3.0",
"passport-local": "^1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).
const adminController = require('./controllers/admin');
const mailingController = require('./controllers/mailing_view');
const userController = require('./controllers/user');

const bulkApiController = require('./controllers/bulkApiMailer');


/**
* Express configuration.
Expand Down Expand Up @@ -248,6 +249,11 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).
userController.isAuthenticated,
mailingController.v_mailingGetTopicStats);

//bulk-api
app.post('/v2/bulk-mail',
//userController.isAuthenticated,
bulkApiController.sendBulkEmails);

/**
* SMTP Mail routes.
*/
Expand Down
Loading

0 comments on commit a55f2ee

Please sign in to comment.