Skip to content

Commit

Permalink
mws-3953 -- bulk api with subscribers and topic details
Browse files Browse the repository at this point in the history
  • Loading branch information
shiva-sc committed Feb 11, 2025
1 parent 886be60 commit 1147875
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 33 deletions.
6 changes: 4 additions & 2 deletions .env-example
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ password=password

keySalt=salt
validHosts=["localhost:8080"]

BASE_URL="http://localhost:8080"

# Setting require to enable 50k more download
CDS_NOTIFY_END_POINT=
Expand All @@ -40,10 +40,12 @@ AWS_ACCESS_KEY=
AWS_SECRET_ACCESS_KEY=
AWS_BUCKET=

# used by bulk api, make sure has a space at end
BULK_GC_NOTIFY_PREPEND="ApiKey-v1 "

# REDIS
REDIS_ENV=stage
REDIS_URI=x-notify-redis
REDIS_URI=notify-redis-1
REDIS_PORT=6379
REDIS_SENTINEL_1_URI=127.0.0.1
REDIS_SENTINEL_1_PORT=26379
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ COPY package*.json ./

RUN npm install -g nodemon
RUN npm install
RUN export NODE_OPTIONS=--max_old_space_size=4096 #4GB

COPY . .

WORKDIR ./x-notify/

COPY ./.env-example ./.env

WORKDIR ./

CMD [ "npm", "start" ]
119 changes: 119 additions & 0 deletions controllers/bulkApiMailer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
const fetch = require('node-fetch');
const mailingManager = require('./mailing');
const mailSend = require("../helpers/mailSend");
const { bulkQueue } = require("../notifyQueue");

const BASE_URL = process.env.BASE_URL || "https://apps.canada.ca/x-notify";
const bulkAPI = "https://api.notification.canada.ca/v2/notifications/bulk";
const BULK_GC_NOTIFY_PREPEND = process.env.BULK_GC_NOTIFY_PREPEND || "ApiKey-v1 ";

// Process jobs
bulkQueue.process(async (job) => {
try {
let mailingState = mailingManager.mailingState;
let jobData = job.data;

// Making the Bulk API POST request
let response = await fetch(bulkAPI, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
"Authorization" : BULK_GC_NOTIFY_PREPEND + jobData.notifyKey
},
body: JSON.stringify( jobData.bulkEmailBody ),
}).then(response => {
if (!response.ok) {
throw new Error(`HTTP Error Status: ${response.status}`);
}
return response.json();
})
.then( result => {
mailingManager.mailingUpdate( jobData.mailingId, mailingState.sent, { historyState: mailingState.sending } );
})

} catch (error) {
if (error.message.includes('HTTP Error Status: 5')) {
throw new Error('Retryable error'); // Ensures Bull retries
}
}
});

// Listen for failures
bulkQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});

exports.sendBulkEmails = async ( mailingId, topicId, subject, mailingBody ) => {
try {
let mailing_name = "Bulk_email-" + topicId;
let mailingTopic = await mailingManager.getTopic( topicId );

if ( !mailingTopic ) {
console.log( " Bulkmailer -- sendBulkEmails mailingTopic" );
console.log( e );
throw new Error( "Bulkmailer sendBulkEmails: Can't find the topic: " + topicId );
}

if ( !mailingTopic.nTemplateMailingId || !mailingTopic.notifyKey ) {
console.log( " Bulkmailer -- sendBulkEmails : check mailingTopic details" );
console.log( e );
throw new Error( "Bulkmailer sendBulkEmails: Can't find the topic: " + topicId );
}

let subscribers = await mailSend.getConfirmedSubscriberAsArray( topicId );
if ( !subscribers.length) {
console.log( " Bulkmailer -- sendBulkEmails : No subscribers" );
console.log( e );
throw new Error( "Bulkmailer sendBulkEmails: No subscribers for the topic: " + topicId );
}

let formattedSubsArray = await formatSubsArray( subscribers, mailingBody, subject);
let bulkEmailBody = {
"name": mailing_name,
"template_id": mailingTopic.nTemplateMailingId,
"rows": formattedSubsArray
}

bulkQueue.add(
{
bulkEmailBody: bulkEmailBody,
notifyKey: mailingTopic.notifyKey,
mailingId: mailingId,
},
{
attempts: 5, // Maximum number of retries
backoff: {
type: 'exponential', // Use exponential backoff
delay: 5000 // Initial delay of 1 second (doubles each retry)
}
}
);
} catch (err) {
throw Error('sendBulkEmails error: ' + err, 500)
}
}

formatSubsArray = async ( listEmail, mailingBody, subject) => {

let i, i_len = listEmail.length, subscriber;
let subsArray = [
["subject", "email address", "body", "unsub_link"]
]
for( i = 0; i !== i_len; i++) {
subscriber = listEmail[ i ];

const { email, _id } = subscriber;

const userCodeUrl = ( _id ? _id.toHexString() : _id );

if ( !email || !userCodeUrl ) {
continue;
}

let unsub_link = BASE_URL + "/subs/remove/" + userCodeUrl
subsArray.push( [subject, email, mailingBody, unsub_link] )
}

return subsArray

}
38 changes: 27 additions & 11 deletions controllers/mailing.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const dbConn = module.parent.parent.exports.dbConn;
const ObjectId = require('mongodb').ObjectId;

const { Worker } = require('worker_threads');
const bulkApiMailer = require('./bulkApiMailer');

const _mailingState = {
cancelled: "cancelled",
Expand Down Expand Up @@ -232,20 +233,33 @@ exports.mailingCancelSendToSub = async ( mailingId ) => {
}

exports.mailingSendToSub = async ( mailingId ) => {
// Need to be in current state "approved"
let mailing = await dbConn.collection( "mailing" ).findOne( { _id: ObjectId( mailingId ) } );
if ( !mailing ) {
console.log( "mailingSendToSub: Invalid mailing id: " + mailingId );
throw new Error( "mailingSendToSub: Mailing unavailable" );
}

const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } );

let topic = await getTopic( mailing.topicId );
if ( !topic ) {
console.log( "mailingSendToSub: no topic: " + mailing.topicId );
throw Error( "mailingSendToSub : no topic with topicId: " + mailing.topicId);
}

// Need to be in current state "approved"
const rDoc = await mailingUpdate( mailingId, _mailingState.sending, { historyState: _mailingState.approved } );
// Check if the operation was successful, if not we know the error is already logged
if ( !rDoc ) {
return true;
}

// Do the sending
sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );



//if the bulkMail flag is set emails are delivered using bulk api
if ( topic.bulkMail ) {
bulkApiMailer.sendBulkEmails( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );
} else {
// Do the sending
sendMailingToSubs( mailingId, rDoc.topicId, rDoc.subject, rDoc.body );
}

// When completed, change state to "sent"

}
Expand Down Expand Up @@ -389,7 +403,7 @@ async function mailingUpdate( mailingId, newHistoryState, options ) {
// Send the mailing to the "approval email list"
return rDoc.value;
}

exports.mailingUpdate = mailingUpdate;

// Simple worker to send mailing
async function sendMailingToSubs ( mailingId, topicId, mailingSubject, mailingBody ) {
Expand Down Expand Up @@ -492,7 +506,8 @@ getTopic = ( topicId ) => {
unsubURL: 1,
thankURL: 1,
failURL: 1,
inputErrURL: 1
inputErrURL: 1,
bulkMail: 1,
}
} ).catch( (e) => {
console.log( "getTopic" );
Expand All @@ -512,4 +527,5 @@ getTopic = ( topicId ) => {

return topic;

}
}
exports.getTopic = getTopic
2 changes: 1 addition & 1 deletion controllers/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const processEnv = process.env,
_subsLinkSuffix = processEnv.subsLinkSuffix || "853e0212b92a127";


const redisUri = process.env.REDIS_URI || 'x-notify-redis';
const redisUri = process.env.REDIS_URI || 'notify-redis-1';
const redisPort = process.env.REDIS_PORT || '6379';
const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1';
const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379';
Expand Down
2 changes: 2 additions & 0 deletions controllers/workerSendEmail.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,5 @@ getConfirmedSubscriberAsArray = async ( topicId ) => {

return docsItems;
};

exports.getConfirmedSubscriberAsArray = getConfirmedSubscriberAsArray;
36 changes: 23 additions & 13 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,38 +1,48 @@
version: '3.8'
services:
mongo:
notify-mongo-1:
image: mongo:4.2
container_name: x-notify-mongo
container_name: notify-mongo-1
ports:
- target: 27017
published: 27016
protocol: tcp
mode: host
networks:
- x-notify-net
redis:
- notify-net-1
volumes:
- mongo_data:/data/db # Persistent volume for MongoDB
- mongo_config:/data/configdb # Configuration volume

notify-redis-1:
image: redis:6.0.1
container_name: x-notify-redis
container_name: notify-redis-1
ports:
- "6379:6379"
networks:
- x-notify-net
x-notify:
- notify-net-1
notify-node-1:
build: ./
container_name: x-notify
container_name: notify-node-1
ports:
- "8080:8080"
restart: on-failure
environment:
- MONGODB_URI=mongodb://mongo:27017/test
- MONGODB_URI=mongodb://notify-mongo-1
- NODE_ENV=development
env_file: ".env"
depends_on:
- mongo
- notify-mongo-1
- notify-redis-1
volumes:
- .:/x-notify
- .:/notify-1
- /node_modules
networks:
- x-notify-net
- notify-net-1
volumes:
mongo_data:
mongo_config:

networks:
x-notify-net:
notify-net-1:
driver: bridge
7 changes: 6 additions & 1 deletion notifyQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const redisUri = process.env.REDIS_URI || 'x-notify-redis';
const redisUri = process.env.REDIS_URI || 'notify-redis-1';
const redisPort = process.env.REDIS_PORT || '6379';
const redisSentinel1Uri = process.env.REDIS_SENTINEL_1_URI || '127.0.0.1';
const redisSentinel1Port = process.env.REDIS_SENTINEL_1_PORT || '26379';
Expand Down Expand Up @@ -35,11 +35,16 @@ if (process.env.NODE_ENV === 'prod') {

const notifyQueue = new Queue('sendMail', redisConf);

//used by bulk Api manager
const bulkQueue = new Queue('bulk-api', redisConf);
exports.bulkQueue = bulkQueue;

const serverAdapter = new ExpressAdapter();

createBullBoard({
queues: [
new BullAdapter( notifyQueue ),
new BullAdapter( bulkQueue ),
],
serverAdapter
})
Expand Down
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
"author": "Government of Canada",
"contributors": [
"duboisp",
"GormFrank"
"GormFrank",
"shiva-sc"
],
"scripts": {
"start": "nodemon -L server.js",
Expand All @@ -37,6 +38,7 @@
"mustache": "^4.0.1",
"nodemailer": "^6.4.6",
"notifications-node-client": "^4.8.0",
"node-fetch": "^2.7.0",
"passport": "^0.4.1",
"passport-http": "^0.3.0",
"passport-local": "^1.0.0"
Expand Down
8 changes: 7 additions & 1 deletion server.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).
const adminController = require('./controllers/admin');
const mailingController = require('./controllers/mailing_view');
const userController = require('./controllers/user');

const bulkApiController = require('./controllers/bulkApiMailer');


/**
* Express configuration.
Expand Down Expand Up @@ -248,6 +249,11 @@ MongoClient.connect( processEnv.MONGODB_URI || '', {useUnifiedTopology: true} ).
userController.isAuthenticated,
mailingController.v_mailingGetTopicStats);

//bulk-api
app.post('/v2/bulk-mail',
//userController.isAuthenticated,
bulkApiController.sendBulkEmails);

/**
* SMTP Mail routes.
*/
Expand Down
3 changes: 2 additions & 1 deletion setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ Note: We need to set the Service ID associated to the topic details (field: `nSe

### REDIS Default Configuration
* `REDIS_ENV` Set environment value for Redis. Default: `stage` and `prod` which would leverage the redis-sentinel in production environment
* `REDIS_URI` Redis URI, the alias or the IP of the server host. Default: `x-notify-redis`
* `REDIS_URI` Redis URI, the alias or the IP of the server host. Default: `notify-redis-1`
* `REDIS_PORT` Port of Redis server. Default: `6379`
* `REDIS_SENTINEL_1_URI` Redis Sentinel 1 URI. Default: `127.0.0.1`
* `REDIS_SENTINEL_1_PORT` Redis Sentinel 1 PORT. Default: `26379`
Expand All @@ -168,6 +168,7 @@ topics
* failURL: Failure URL for server error,
* inputErrURL: Failure URL for filling out the form incorrectly
* nTemplateMailingId: template ID for sending a corresponding mailing
* bulkMail: When set to true, enables usage bulk api from GCNotify


topics_details
Expand Down

0 comments on commit 1147875

Please sign in to comment.