diff --git a/pom.xml b/pom.xml
index 19f0109..b00c44a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,6 +135,12 @@
3.2.4
test
+
+ org.skyscreamer
+ jsonassert
+ 1.5.0
+ test
+
diff --git a/src/main/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClient.java b/src/main/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClient.java
index 581b629..dfd3669 100644
--- a/src/main/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClient.java
+++ b/src/main/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClient.java
@@ -3,7 +3,10 @@
import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.device.Message;
+import no.entra.bacnet.agent.devices.DeviceId;
+import no.entra.bacnet.agent.mqtt.azureiot.Observation;
import no.entra.bacnet.agent.mqtt.azureiot.SendReceive;
+import no.entra.bacnet.rec.ConfigurationRequest;
import no.entra.bacnet.rec.RealEstateCore;
import org.slf4j.Logger;
@@ -35,23 +38,46 @@ public AzureIoTMqttClient(String deviceConnectionString) throws URISyntaxExcepti
connect();
}
+ /**
+ * Used for testing
+ */
+ protected AzureIoTMqttClient() {
+ log.warn("Creating AzureIoTMqttClient for offline testing only.");
+ }
+
@Override
- public void publishRealEstateCore(RealEstateCore message, Optional senderAddress) {
- if (message != null) {
- String msgStr = message.toJson();
- Message msg = new Message(msgStr);
- msg.setContentTypeFinal("application/json");
- if (senderAddress.isPresent()) {
- msg.setProperty(MESSAGE_FROM, senderAddress.get().toString());
- }
- msg.setProperty(MESSAGE_TYPE, REAL_ESTATE_CORE);
- msg.setMessageId(java.util.UUID.randomUUID().toString());
- msg.setExpiryTime(D2C_MESSAGE_TIMEOUT);
+ public void publishRealEstateCore(RealEstateCore recMessage, DeviceId recDeviceId, Optional senderAddress) {
+ if (recMessage != null) {
+ Message msg = buildMqttMessage(recMessage, recDeviceId, senderAddress);
sendMessage(msg);
}
}
+ public Message buildMqttMessage(RealEstateCore recMessage, DeviceId recDeviceId, Optional senderAddress) {
+ String msgStr = "{ \n";
+ if (recDeviceId != null) {
+ msgStr += "\"deviceId\": \"" + recDeviceId.getId() + "\",\n";
+ }
+ if (recMessage != null) {
+ if (recMessage instanceof ConfigurationRequest) {
+ msgStr += "\"configurations\": [" + recMessage.toJson() + "]\n";
+ } else if (recMessage instanceof Observation) {
+ msgStr += "\"observations\": [" + recMessage.toJson() + "]\n";
+ }
+ }
+ msgStr += "}";
+ Message msg = new Message(msgStr);
+ msg.setContentTypeFinal("application/json");
+ if (senderAddress.isPresent()) {
+ msg.setProperty(MESSAGE_FROM, senderAddress.get().toString());
+ }
+ msg.setProperty(MESSAGE_TYPE, REAL_ESTATE_CORE);
+ msg.setMessageId(java.util.UUID.randomUUID().toString());
+ msg.setExpiryTime(D2C_MESSAGE_TIMEOUT);
+ return msg;
+ }
+
@Override
public void publishUnknownHexString(String hexString) {
Message msg = new Message(hexString);
diff --git a/src/main/java/no/entra/bacnet/agent/mqtt/MqttClient.java b/src/main/java/no/entra/bacnet/agent/mqtt/MqttClient.java
index 5a21510..6aae329 100644
--- a/src/main/java/no/entra/bacnet/agent/mqtt/MqttClient.java
+++ b/src/main/java/no/entra/bacnet/agent/mqtt/MqttClient.java
@@ -1,5 +1,6 @@
package no.entra.bacnet.agent.mqtt;
+import no.entra.bacnet.agent.devices.DeviceId;
import no.entra.bacnet.rec.RealEstateCore;
import java.net.InetAddress;
@@ -7,6 +8,6 @@
public interface MqttClient {
- void publishRealEstateCore(RealEstateCore message, Optional sourceAddress);
+ void publishRealEstateCore(RealEstateCore message, DeviceId recDeviceId, Optional sourceAddress);
void publishUnknownHexString(String hexString);
}
diff --git a/src/main/java/no/entra/bacnet/agent/observer/BlockingRecordAndForwardObserver.java b/src/main/java/no/entra/bacnet/agent/observer/BlockingRecordAndForwardObserver.java
index b35d0a7..dd69512 100644
--- a/src/main/java/no/entra/bacnet/agent/observer/BlockingRecordAndForwardObserver.java
+++ b/src/main/java/no/entra/bacnet/agent/observer/BlockingRecordAndForwardObserver.java
@@ -58,11 +58,11 @@ public void bacnetHexStringReceived(InetAddress sourceAddress, String hexString)
if (bacnetJson != null) {
RealEstateCore message = null;
try {
- DeviceId deviceId = findDeviceId(bacnetJson);
+ DeviceId recDeviceId = findDeviceId(bacnetJson);
message = Bacnet2Rec.bacnetToRec(bacnetJson);
if (message != null) {
message.setSenderAddress(sourceAddress.toString());
- mqttClient.publishRealEstateCore(message, Optional.of(sourceAddress));
+ mqttClient.publishRealEstateCore(message, recDeviceId, Optional.of(sourceAddress));
log.info("Message is published from bacnetJson: {}", bacnetJson);
} else {
log.trace("Could not send empty message from bacnetJson: {}", bacnetJson);
diff --git a/src/test/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClientTest.java b/src/test/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClientTest.java
new file mode 100644
index 0000000..07c0a96
--- /dev/null
+++ b/src/test/java/no/entra/bacnet/agent/mqtt/AzureIoTMqttClientTest.java
@@ -0,0 +1,59 @@
+package no.entra.bacnet.agent.mqtt;
+
+import com.microsoft.azure.sdk.iot.device.Message;
+import no.entra.bacnet.agent.devices.DeviceId;
+import no.entra.bacnet.rec.ConfigurationRequest;
+import no.entra.bacnet.rec.helpers.DateTimeHelper;
+import org.junit.Before;
+import org.junit.Test;
+import org.skyscreamer.jsonassert.JSONAssert;
+
+import java.time.LocalDateTime;
+import java.util.Optional;
+
+import static org.junit.Assert.assertNotNull;
+
+public class AzureIoTMqttClientTest {
+
+ private LocalDateTime observationTime;
+ private AzureIoTMqttClient mqttClient;
+ private String expectedJson = "{\n" +
+ " \"deviceId\": \"id1234\",\n" +
+ " \"configurations\": [{\n" +
+ " \"sender\": \"sendt from\",\n" +
+ " \"recipient\": \"recip\",\n" +
+ " \"observationTime\": \"2019-12-09T20:57:17.776468\",\n" +
+ " \"id\": \"123\",\n" +
+ " \"type\": \"IHave\",\n" +
+ " \"properties\": {\n" +
+ " \"ObjectName\": \"TFM434\"\n" +
+ " }\n" +
+ " }]\n" +
+ "}";
+
+ @Before
+ public void setUp() throws Exception {
+ String localDateTime = "2019-12-09T20:57:17.776468";
+ observationTime = DateTimeHelper.fromIsoString(localDateTime);
+ mqttClient = new AzureIoTMqttClient();
+ }
+
+ @Test
+ public void buildMqttMessage() {
+ ConfigurationRequest recMessage = new ConfigurationRequest();
+ recMessage.setObservationTime(observationTime);
+ recMessage.setId("123");
+ recMessage.setRecipient("recip");
+ recMessage.setSender("sendt from");
+ recMessage.setType("IHave");
+ recMessage.addProperty("ObjectName", "TFM434");
+ DeviceId recDeviceId = new DeviceId("id1234");
+ recDeviceId.setTfmTag("TFM767");
+ Message mqttMessage = mqttClient.buildMqttMessage(recMessage, recDeviceId, Optional.empty());
+ assertNotNull(mqttMessage);
+ byte[] bodyBytes = mqttMessage.getBytes();
+ String bodyJson = new String(bodyBytes);
+ assertNotNull(bodyJson);
+ JSONAssert.assertEquals(expectedJson, bodyJson, true);
+ }
+}
\ No newline at end of file