Skip to content

Commit

Permalink
adding the injector for all devices at once
Browse files Browse the repository at this point in the history
  • Loading branch information
deweirdt committed Oct 26, 2023
1 parent 6885e74 commit 2f8636b
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 216 deletions.
140 changes: 140 additions & 0 deletions amqp.controller.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
const amqp = require('amqplib/callback_api');
require('dotenv').config();

var offlinePubQueue = [];
var amqpConn = null;
var channel = null;

module.exports = {
setupConnection,
publish,
consume
}

function setupConnection() {
console.log("Startup AMQPConnection");
amqpConnection = 'amqp://' + process.env.RABBIT_MQ_USERNAME +':'+process.env.RABBIT_MQ_PASSWORD +'@'+process.env.RABBIT_MQ_HOST;
amqp.connect(amqpConnection, function (err, conn) {

if (err) {
console.error("[AMQP]", err.message);
console.log("[AMQP] retrying");
return setTimeout(setupConnection, 1000);
}
conn.on("error", function(err) {
console.log("[AMQP] error happened: ", err);
if (err.message !== "Connection closing") {
console.error("[AMQP] conn error", err.message);
}
});
conn.on("close", function() {
console.error("[AMQP] reconnecting");
return setTimeout(setupConnection, 1000);
});
console.log("[AMQP] connected");
amqpConn = conn;
whenConnected();
});

}

function whenConnected() {
startPublisher();
}

function startPublisher() {
amqpConn.createConfirmChannel(function(err, ch) {
if(closeOnErr(err)) return;

ch.on("error", function(err) {
console.log("[AMQP] channel error", err.message);
});

ch.on("close", function() {
console.log("[AMQP] channel closed")
});

ch.prefetch(10);
ch.assertExchange(process.env.RABBIT_MQ_EXCHANGE, 'fanout', {
durable: true
});

channel = ch;
console.log("Flushing the queued messages");
while(true) {
queuedMessage = offlinePubQueue.shift();
if( !queuedMessage ) {
console.log("No message in the cached sending queue");
break;
}
publish(queuedMessage);
}
});
}

function publish(data) {
try {
channel.publish(process.env.RABBIT_MQ_EXCHANGE,
'',
Buffer.from(JSON.stringify(data)),
{ persistent: true },
function(err,ok) {
if(err) {
console.error("Could not publish put on local stack", err);
offlinePubQueue.push(data);
channel.connection.close();
} else {
//console.log("Message correctly stored on the AMQP");
}

});
} catch(e) {
console.log("Exception pushing on broker", e.message);
offlinePubQueue.push(data);
console.log("local stack is containing: %d, items", offlinePubQueue.length);
}
}

function closeOnErr(err) {
if(!err) return false;
console.log("[AMQP] error", err);
amqpConn.close();
return true;
}

function consume(exchangeName, queueName, consumeMethod) {
try {
channel.assertQueue(queueName, {durable: true}, function(err, data) {
channel.bindQueue(queueName, exchangeName, '');
});
channel.on("close", function() {
console.log("Channel was closed");
setTimeout(consume, 1000, exchangeName, queueName, consumeMethod);
});
channel.on("close", function() {
console.log("Channel was closed");
setTimeout(consume, 1000, exchangeName, queueName, consumeMethod);
});

console.log("Channel has been configred exchangeName: %s, queueName: %s", exchangeName, queueName);
channel.consume(queueName, function(msg) {
//Call the callback function to the user
consumeMethod(msg.content.toString(), function(processed) {
try{
if(processed) {
console.log("Message got processed, we can ack");
channel.ack(msg);
} else {
console.log("Don't do anything I guess, message should remain where it is");
}
} catch(e) {
console.log("Exception when consuming data");
}
})
}, {noAck: false});
}
catch(e) {
console.log("Channel is not configured yet");
setTimeout(consume, 1000, exchangeName, queueName, consumeMethod);
}
}
14 changes: 14 additions & 0 deletions domuz-lookup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[
{
"ip" : "192.168.1.212",
"alias" : "Boven"
},
{
"ip" : "192.168.1.211",
"alias" : "Living"
},
{
"ip" : "192.168.1.192",
"alias" : "TechRuimte"
}
]
34 changes: 7 additions & 27 deletions domuz-processor.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,9 @@
var amqp = require('amqp-connection-manager');
const amqp = require('./amqp.controller');
require('dotenv').config();
const Influx = require('influx');
const nano = require('nano-seconds');


amqpConnection = 'amqp://' + process.env.AMQP_USERNAME +':'+process.env.AMQP_PASSWORD +'@'+process.env.AMQP_HOST;
const connection = amqp.connect([amqpConnection]);
connection.on('connect', () => console.log('AMQP Connected!'));
connection.on('disconnect', err => console.log('AMQP Disconnected.', err.stack));

// Set up a channel listening for messages in the queue.
var channelWrapper = connection.createChannel({
setup: channel => {
// `channel` here is a regular amqplib `ConfirmChannel`.
return Promise.all([
channel.assertQueue(process.env.AMQP_QUEUE, { durable: true }),
channel.assertExchange(process.env.AMQP_EXCHANGE, 'fanout'),
channel.prefetch(1),
channel.bindQueue(process.env.AMQP_QUEUE, process.env.AMQP_EXCHANGE, ''),
channel.consume(process.env.AMQP_QUEUE, onMessage)
])
}
});

channelWrapper.waitForConnect()
.then(function() {
console.log("Listening for AMQP messages");
});

const influx = new Influx.InfluxDB({
host: process.env.INFLUX_HOST,
Expand All @@ -40,10 +17,13 @@ influx.getDatabaseNames()
}
})
.then(() => {
//This will connect to the rabbitMQ
amqp.setupConnection();
amqp.consume(process.env.RABBIT_MQ_EXCHANGE, process.env.RABBIT_MQ_QUEUE, processMsg);
});

const onMessage = data => {
var message = JSON.parse(data.content.toString());
function processMsg(msg, confirm) {
var message = JSON.parse(msg);
console.log("Received message: %j", message);
//storePumpState(message);
var promiseList = [];
Expand All @@ -60,7 +40,7 @@ const onMessage = data => {
//console.log("List of promisses: ", promiseList);
Promise.all(promiseList).then((values) => {
console.log("All done");
channelWrapper.ack(data);
confirm(true);
});
}

Expand Down
107 changes: 47 additions & 60 deletions injector.js
Original file line number Diff line number Diff line change
@@ -1,83 +1,80 @@
require('dotenv').config()
const amqp = require('amqp-connection-manager');
const amqp = require('./amqp.controller');
const xml2js = require('xml2js');
const domuzDevices = require('./domuz-lookup.json');


const http = require('http');
const { read } = require('fs');
const { start } = require('repl');

parser = new xml2js.Parser( {
normalizeTags: true,
normalize: true,
});

amqpConnection = 'amqp://' + process.env.AMQP_USERNAME +':'+process.env.AMQP_PASSWORD +'@'+process.env.AMQP_HOST;

const connection = amqp.connect([amqpConnection]);

connection.on('connect', () => console.log('AMQP Connected!'));
connection.on('disconnect', err => console.log('AMQP Disconnected.', err.stack));

// Create a channel wrapper
const channelWrapper = connection.createChannel({
json: true,
setup: channel => channel.assertExchange(process.env.AMQP_EXCHANGE, 'fanout')
});
function startReadout() {
console.log("Starting readout");
domuzDevices.forEach( (domuzDevice) => {
readoutDevice(domuzDevice);
});
console.log("Done reading");
}

const options = {
//...
hostname: process.env.DOMUZ,
path: '/data/static.xml',
timeout: 3000,
};
function readoutDevice(domuzDevice) {
console.log("Reading data from: ", domuzDevice.alias);

console.log("Domuz connection are: %j", options);
const options = {
//...
hostname: domuzDevice.ip,
path: '/data/static.xml',
timeout: 3000,
};

function readDomuzdata() {
var req = http.get(options, (resp) => {
var packet = '';

resp.on('data', (data) => {
packet += data;
let packet = '';

resp.on('data', (chunk) => {
//console.log("Received part of the data");
packet += chunk;
});

resp.on("end", () => {
resp.on('end', () => {
//console.log("Got data for: " + domuzDevice.alias + " data: ", packet);
try {
parser.parseString(packet, function(err, result) {
if(err) {
console.log("Problem in parsing: ", err);

} else {
domuzData = parseData(result);
console.log("Result is: %s", domuzData.date);
publishDomuzData(domuzData);

console.log("Result is received for: ", domuzDevice.alias);

for(let i=0;i<500;i++) {
amqp.publish(domuzData);
}


amqp.publish(domuzData);
}
packet = '';
});
} catch(err) {
console.log("Issue with parsing data: ", err);
packet = '';
}

});
}).on("error", (err) => {
// No idea why I'm having this expection here, just hide it :)
//console.log("Error: ", err.message);
})

resp.on("error", (err) => {
console.log("Error received: ", err);
});

}).end();
req.on('error', function(e) {
console.log("Req error: ", e);
req.abort();
});
req.on('timeout', function(e) {
console.log("Req timeout: ", e);
req.abort();
});
req.on('uncaughtException', function(e) {
console.log("Req uncaughtException: ", e);
req.abort();
});
}


function parseData(data) {

let heatArea = [];
Expand Down Expand Up @@ -142,20 +139,10 @@ function getHeaterMode(data) {
return mode;
}

function publishDomuzData(domuzData) {
console.log("Publishing %j", domuzData);
channelWrapper.publish(process.env.AMQP_EXCHANGE, '', JSON.parse(JSON.stringify(domuzData)), { contentType: 'application/json', persistent: true })
.then(function() {
console.log("Message pushed on the AMPQ");
})
.catch(err => {
console.log("Message was rejected:", err.stack);
channelWrapper.close();
connection.close();
});
}

//readDomuzdata();
//Read each minute
//setInterval(readDomuzdata, 5000);
setInterval(readDomuzdata, 60000);
amqp.setupConnection();

startReadout();
setInterval(startReadout, 5000);
console.log('Starting Domuz readout');

Loading

0 comments on commit 2f8636b

Please sign in to comment.