Skip to content

Commit

Permalink
Update the authentification configuration for the IoTDB handler
Browse files Browse the repository at this point in the history
Add git ignore for the docker generated files
Add Read method to IoTDBHandler
Rename Data Type Object
Apply format and remove invalid comment
Optimize code to standarize the response between handlers.
Repair documentation for the project configuration
Add anonym data
Manage Sessions
Add Write method add client message data to IoTDB

Signed-off-by: Christian Muehlbauer <[email protected]>
  • Loading branch information
chrizmc committed Jul 31, 2024
1 parent 72ab98b commit c075e51
Show file tree
Hide file tree
Showing 10 changed files with 843 additions and 49 deletions.
28 changes: 26 additions & 2 deletions cdsp/information-layer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,36 @@ The Hello World example in our case is quite simple. We feed an updated value fo
```

### IoTDB
- Not yet supported
- Ensure to start and run Docker containers defined in a [Docker Compose file](/docker/).
- Ensure that in the IoTDB CLI there is a `root.Vehicles` *database* like this:

```
IoTDB> show databases;
+-------------+----+-----------------------+---------------------+---------------------+
| Database| TTL|SchemaReplicationFactor|DataReplicationFactor|TimePartitionInterval|
+-------------+----+-----------------------+---------------------+---------------------+
|root.Vehicles|null| 1| 1| 604800000|
+-------------+----+-----------------------+---------------------+---------------------+
Total line number = 1
It costs 0.004s
```

- Create two *timeseries* with the `root.Vehicles.VIN` and some VSS data. At the moment only 1 data point is supported, namely `root.Vehicles.Vehicle_Cabin_HVAC_AmbientAirTemperature`. Here you can see how the vehicle document within the *Vehicles* should look like in IoTDB CLI:

```
IoTDB> show timeseries;
+------------------------------------------------------+-----+-------------+--------+--------+-----------+----+----------+--------+------------------+--------+
| Timeseries|Alias| Database|DataType|Encoding|Compression|Tags|Attributes|Deadband|DeadbandParameters|ViewType|
+------------------------------------------------------+-----+-------------+--------+--------+-----------+----+----------+--------+------------------+--------+
|root.Vehicles.Vehicle_Cabin_HVAC_AmbientAirTemperature| null|root.Vehicles| FLOAT| RLE| LZ4|null| null| null| null| BASE|
| root.Vehicles.VIN| null|root.Vehicles| TEXT| PLAIN| LZ4|null| null| null| null| BASE|
+------------------------------------------------------+-----+-------------+--------+--------+-----------+----+----------+--------+------------------+--------+
```

## [Start](./router/README.md#Run) the Database Router

## Look out for the Websocket Server message in the console
Now you can changed the value of `Vehicle_Cabin_HVAC_AmbientAirTemperature` in ATLAS cloud to let's say `23`. After changing you should immediately see this line in console:
If you are running the RealmDB handler and you change the value of `Vehicle_Cabin_HVAC_AmbientAirTemperature` in ATLAS cloud (let's say `23`), you should immediately see this line in console:

```
the value of "Vehicle_Cabin_HVAC_AmbientAirTemperature" changed to 23
Expand Down
13 changes: 8 additions & 5 deletions cdsp/information-layer/handlers/iotdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ Create `config/config.js` with the following format, replacing the app id and th

```js
module.exports = {
iotdbHost: "your-iotdb-host", // Default "localhost"
iotdbPort: 6667, // Set this to the appropriate IotDB Port
iotdbUser: "your-iotdb-user", // Default "root"
iotdbPassword: "your-iotdb-password", // Default "root"
timeZone: "UTC+2", // Set this to the appropriate time zone
module.exports = {
iotdbHost: "your-iotdb-host", // Default "localhost"
iotdbPort: 6667, // Set this to the appropriate IotDB Port
iotdbUser: "your-iotdb-user", // Default "root"
iotdbPassword: "your-iotdb-password", // Default "root"
timeZoneId: Intl.DateTimeFormat().resolvedOptions().timeZone, // Set this to the appropriate time zone
fetchSize: 10000, // number of rows that will be fetched from the database at a time when executing a query
};
};
```

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
const JSDataType = require("../utils/IoTDBConstants");

const MeasurementType = Object.freeze({
VIN: JSDataType.TEXT,
Vehicle_Cabin_HVAC_AmbientAirTemperature: JSDataType.FLOAT,
});

module.exports = MeasurementType;
269 changes: 244 additions & 25 deletions cdsp/information-layer/handlers/iotdb/src/iotdb-handler.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,289 @@
const Thrift = require("thrift");
const Client = require("../gen-nodejs/IClientRPCService");
const {
TSExecuteStatementReq,
TSOpenSessionReq,
TSProtocolVersion,
TSCloseSessionReq,
TSInsertRecordReq,
} = require("../gen-nodejs/client_types");
const thrift = require("thrift");
const Handler = require("../../handler");
const IClientRPCService = require("../gen-nodejs/IClientRPCService");
const ttypes = require("../gen-nodejs/client_types");
const config = require("../config/config");
const { Session } = require("inspector");
const SessionDataSet = require("../utils/SessionDataSet");
const { IoTDBDataInterpreter } = require("../utils/IoTDBDataInterpreter");
const MeasurementType = require("../config/MeasurementsType");

const sendMessageToClient = (ws, message) => {
ws.send(JSON.stringify(message));
};

class IoTDBHandler extends Handler {
constructor() {
super();
this.connection = null;
this.client = null;
this.sendMessageToClients = null;
this.sessionId = null;
this.statementId = 0;
this.protocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3;
this.statementId = null;
this.zoneId = config.timeZoneId;
this.fetchSize = config.fetchSize;
this.isSessionClosed = true;
}

async authenticateAndConnect(sendMessageToClients) {
try {
this.sendMessageToClients = sendMessageToClients;

this.connection = Thrift.createConnection(
const connection = thrift.createConnection(
config.iotdbHost,
config.iotdbPort,
{
transport: Thrift.TFramedTransport,
protocol: Thrift.TBinaryProtocol,
transport: thrift.TFramedTransport,
protocol: thrift.TBinaryProtocol,
}
);

this.client = Thrift.createClient(IClientRPCService, this.connection);
this.client = thrift.createClient(Client, connection);

this.connection.on("error", (err) => {
console.error("Thrift connection error:", err);
connection.on("error", (err) => {
console.error("thrift connection error:", err);
});

console.log("Successfully connected to IoTDB using Thrift");

this.open_session();
console.log("Successfully connected to IoTDB using thrift.");
} catch (error) {
console.error("Failed to authenticate with IoTDB:", error);
}
}

async open_session() {
async read(message, ws) {
const objectId = message.data.VIN;
const sql = `SELECT * FROM root.Vehicles WHERE root.Vehicles.VIN = '${objectId}'`;
try {
const openSessionReq = new ttypes.TSOpenSessionReq({
username: config.iotdbUser,
password: config.iotdbPassword,
client_protocol: ttypes.TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3, // Set the appropriate protocol version
zoneId: config.timeZone,
await this.openSession();
const sessionDataSet = await this.#executeQueryStatement(sql);

// Check if the response contains data
if (sessionDataSet == {}) {
console.log("No data found in query response.");
sendMessageToClient(
ws,
JSON.stringify({ error: "No data found in query response." })
);
} else {
let mediaElements = [];
while (sessionDataSet.hasNext()) {
const mediaElement = sessionDataSet.next();
mediaElements.push(mediaElement);
}
if (mediaElements.length > 0) {
for (let i = 0; i < mediaElements.length; ++i) {
const transformedObject =
IoTDBDataInterpreter.extractDeviceIdFromTimeseries(
mediaElements[i]
);
const response = {
type: "read_response",
data: transformedObject,
};
sendMessageToClient(ws, JSON.stringify(response));
}
} else {
sendMessageToClient(
ws,
JSON.stringify({ error: "Object not found." })
);
}
}
} catch (error) {
console.error("Failed to read data from IoTDB: ", error);
} finally {
this.closeSession();
}
}

async write(message, ws) {
let measurements = [];
let dataTypes = [];
let values = [];
try {
await this.openSession();
Object.entries(message.data).forEach(async ([key, value]) => {
measurements.push(key);
dataTypes.push(MeasurementType[key]);
values.push(value);
});
let deviceId = "root.Vehicles";
const timestamp = new Date().getTime();
const status = await this.#insertRecord(
deviceId,
timestamp,
measurements,
dataTypes,
values
);

const response = `insert one record to device ${deviceId}, message: ${status.message}`;
console.log(response);
sendMessageToClient(ws, response);
} catch (error) {
console.error("Failed to write data from IoTDB: ", error);
} finally {
this.closeSession();
}
}

/**
* Opens a session with the IoTDB server using the provided credentials and configuration.
*/
async openSession() {
if (!this.isSessionClosed) {
console.info("The session is already opened.");
return;
}
const openSessionReq = new TSOpenSessionReq({
username: config.iotdbUser,
password: config.iotdbPassword,
client_protocol: this.protocolVersion,
zoneId: config.timeZoneId,
configuration: { version: "V_0_13" },
});

try {
const resp = await this.client.openSession(openSessionReq);

if (this.protocolVersion != resp.serverProtocolVersion) {
console.log(
"Protocol differ, Client version is " +
this.protocolVersion +
", but Server version is " +
resp.serverProtocolVersion
);
// version is less than 0.10
if (resp.serverProtocolVersion == 0) {
throw new Error("Protocol not supported.");
}
}

this.sessionId = resp.sessionId;
console.log(
"Successfully authenticated with IoTDB, session ID:",
this.sessionId
this.statementId = await this.client.requestStatementId(this.sessionId);
this.isSessionClosed = false;
console.log("Session started!");
} catch (error) {
console.error("Failed starting session with IoTDB: ", error);
}
}

isSessionOpen() {
return !this.isSessionClosed;
}

closeSession() {
if (this.isSessionClosed) {
console.info("Session is already closed.");
return;
}

let req = new TSCloseSessionReq({
sessionId: this.sessionId,
});

try {
this.client.closeSession(req);
} catch (error) {
console.error(
"Error occurs when closing session at server. Maybe server is down. Error message: ",
err
);
} finally {
this.isSessionClosed = true;
console.log("Session closed!");
}
}

/**
* Executes a SQL query statement asynchronously.
*
* @param {string} sql - The SQL query statement to be executed.
* @returns {Promise<SessionDataSet|Object>} - Returns a SessionDataSet object if the query is successful,
* otherwise returns an empty object.
* @throws {Error} - Throws an error if the session is not open.
*/
async #executeQueryStatement(sql) {
try {
if (!this.sessionId) {
throw new Error("Session is not open. Please authenticate first.");
}

const request = new TSExecuteStatementReq({
sessionId: this.sessionId,
statement: sql,
statementId: this.statementId,
fetchSize: this.fetchSize,
timeout: 0,
});

const resp = await this.client.executeQueryStatement(request);

if (!resp || !resp.queryDataSet || !resp.queryDataSet.valueList) {
return {};
} else {
return new SessionDataSet(
resp.columns,
resp.dataTypeList,
resp.columnNameIndexMap,
resp.queryId,
this.client,
this.statementId,
this.sessionId,
resp.queryDataSet,
resp.ignoreTimeStamp
);
}
} catch (error) {
console.error("Failed starting session with IoTDB:", error);
console.error("Failed executing query statement: ", error);
}
}

/**
* Inserts a record into the time series database.
* @param {string} deviceId - The ID of the device.
* @param {number} timestamp - The timestamp of the record.
* @param {string[]} measurements - Array of measurement names.
* @param {string[]} dataTypes - Array of data types for each value.
* @param {any[]} values - Array of values to be inserted.
* @param {boolean} isAligned - Flag indicating if the data is aligned.
* @returns {Promise<any>} - A promise that resolves with the result of the insertion.
* @throws {string} - Throws an error if lengths of data types, values, and measurements do not match.
*/
async #insertRecord(
deviceId,
timestamp,
measurements,
dataTypes,
values,
isAligned = false
) {
if (
values.length != dataTypes.length ||
values.length != measurements.length
) {
throw "length of data types does not equal to length of values!";
}
const valuesInBytes = IoTDBDataInterpreter.serializeValues(
dataTypes,
values
);

let request = new TSInsertRecordReq({
sessionId: this.sessionId,
prefixPath: deviceId,
measurements: measurements,
values: valuesInBytes,
timestamp: timestamp,
isAligned: isAligned,
});
return await this.client.insertRecord(request);
}
}

module.exports = IoTDBHandler;
10 changes: 10 additions & 0 deletions cdsp/information-layer/handlers/iotdb/utils/IoTDBConstants.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const JSDataType = Object.freeze({
BOOLEAN: 0,
INT32: 1,
INT64: 2,
FLOAT: 3,
DOUBLE: 4,
TEXT: 5,
});

module.exports = JSDataType;
Loading

0 comments on commit c075e51

Please sign in to comment.