forked from pbegg/signalk-to-influxdb-v2-buffering
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
439 lines (404 loc) · 19.3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
// Filename: signalk-to-influxdb-v2-buffering
//
// Description: The plugin is designed to do batch writes to a cloud hosted influxdb2.0
// data base.The Plugin now uses the https://github.com/influxdata/influxdb-client-js
// library. If the connection to the influxdb is down the batch of metrics should be
// buffered and re-uploaded when the internet connection is re-established
//
// Repository: https://github.com/pbegg/signalk-to-influxdb-v2-buffering
//
// Updated: August 2022
//
// Notes: Aug 2022 add new functionality:
// - now able to push numeric, text and boolean data types to InfluxDB-V2, and other data types as JSON
// - added ability to push data from contexts outside of 'vessel.self'
// - added ability to expand properties of any measurement (i.e. building on previous ability
// to expand position and attitude, now any measurement with multiple properties can be expanded)
// - added ablity to add tags against each individual path
// - added the source and context as tags for each measurement
// - improved handling of wildcard '*' for context and path
// - lots of unecessary refactoring and tidy-up
const { InfluxDB, Point, HttpError } = require('@influxdata/influxdb-client')
module.exports = function (app) {
let options;
let writeApi;
let unsubscribes = [];
let tagAsSelf = false;
let selfContext;
let getSelfContext = function () {
// get the current 'vessel.self' context - this seems unnecessarily difficult due to
// limitations in the signalK network and may cause inconsistant results depending on
// whether UUID or MMSI is defined in the Vessel Base Data on the Server -> Settings page
const selfUuid = app.getSelfPath('uuid');
const selfMmsi = app.getSelfPath('mmsi');
if (selfUuid != null) { // not null or undefined value
return "vessels." + selfUuid;
} else if (selfMmsi != null) {
return "vessels.urn:mrn:imo:mmsi:" + selfMmsi.toString();
}
return null;
};
let addInfluxField = function (point, name, value, expand = false) {
switch (typeof value) {
case 'string':
point.stringField(name, value);
break;
case 'number':
// note that point.floatField will throw an exception for infinite
// values like[Fuel Economy = Speed / Rate], when fuel rate is 0
// https://github.com/pbegg/signalk-to-influxdb-v2-buffering/pull/13#discussion_r656656037
if (isFinite(value) && !isNaN(value)) {
point.floatField(name, value);
}
break;
case 'boolean':
point.booleanField(name, value);
break;
case 'object':
// if the 'expand' option is selected then add a field for each property
// (n.b.recursive / if any obects are self-referential this will break)
if (expand === true) {
for (const property in value) {
addInfluxField(point, property, value[property], true);
}
break;
}
// if 'expand' is false, drop through to send the object as JSON
default:
if (value != null) {
// could be an object, function, whatever... so stringify it...
// note that stringify also can't cope with self-references
point.stringField(name, JSON.stringify(value));
}
break;
}
}
let getInfluxPoint = function (source, context, path, value, timestamp, pathOption) {
// The Point object defines the value for a single measurement,
// and performs internal type and error checking for each value.
// Note:
// - the methods .intField and.uintField aren't used as all numeric values are mapped to floatField
// - any errors with floatField, stringField etc throw an exception thats caught by the calling function
const point = new Point(path)
.timestamp(Date.parse(timestamp));
// Add the value of the given field
addInfluxField(point, "value", value, (pathOption.expand == null ? false : pathOption.expand));
// Add path-level tags if any have been defined
if (pathOption.pathTags != null) {
pathOption.pathTags.forEach(tag => {
point.tag(tag["name"], tag["value"]);
});
}
// Add a tag for the source, in particular so that readings from different
// NMEA2K devices can be filtered - for example its common now for multiple
// measurements for 'navigation.position' to be received over NMEA2K from all
// the different squawky devices like GPS, AIS, radios, weather station etc...
if (source != null) {
point.tag("source", source);
}
// Add a tag with the context so its clear which UUID generated the update
if (context != null && context.length > 0) {
point.tag("context", context);
}
// Add a tag {self: true} when the measurement originates from this vessel -
// this is reliant on an MMSI or UUID to be set in the Vessel Base Data on
// the Server -> Settings page. Potentially it may be inconsistant depending
// on what UUID / MMSI is set so can be turned off on the plugin settings page,
// and manually added as a tag for individual path(s) if needed
if (tagAsSelf === true && context.localeCompare(selfContext) === 0) {
point.tag("self", true);
}
app.debug(`Sending to InfluxDB-V2: '${JSON.stringify(point)}'`);
return point
};
let handleUpdates = function (delta, pathOption) {
// iterate through each update received from the subscription manager
delta.updates.forEach(update => {
//if no u.values then return as there are no values to display
if (!update.values) {
return
}
// iterate through each value received in the update
update.values.forEach(val => {
try {
// if the value is an object and the 'expand' option is set true, each property will be
// unpacked into separate rows in InfluxDB. Note this is recursive - the code will loop to
// unpack properties at all layers (and hypothetically if any obects were self-referential
// this will throw an exception)
writeApi.writePoint(getInfluxPoint(
update["$source"],
delta.context,
val.path,
val.value,
update.timestamp,
pathOption));
} catch (error) {
// log any errors thrown (and skip writing this value to InfluxDB)
app.error(`Error: skipping updated value ${JSON.stringify(val)}`)
}
});
});
};
let _start = function (opts) {
app.debug(`${plugin.name} Started...`)
// set variables from plugin options
options = opts;
selfContext = getSelfContext();
const url = options["influxHost"];
const token = options["influxToken"];
const org = options["influxOrg"];
const bucket = options["influxBucket"];
const writeOptions = options["writeOptions"];
if (options["tagAsSelf"]) {
tagAsSelf = options["tagAsSelf"];
}
// create InfluxDB api object
writeApi = new InfluxDB({
url,
token
}).getWriteApi(
org,
bucket,
'ms',
writeOptions);
// add default (global) tags, if any have been defined
if (options.defaultTags != null) {
let defaultTags = {}
options.defaultTags.forEach(tag => {
defaultTags[tag["name"]] = tag["value"];
});
app.debug(`Default tags: ${JSON.stringify(defaultTags)}`);
writeApi.useDefaultTags(defaultTags);
}
// add subscriptions to signalK updates - note the subscription is created
// individually per path, as there may be different paremeters set for the context
options.pathArray.forEach(pathOption => {
// its useful to be able to turn paths on or off, when trying out options for setup of InfluxDB2.0
if (pathOption.enabled === true) {
// create a subsciption definition
localSubscription = {
"context": pathOption.context,
"subscribe": [{
"path": pathOption.path,
"policy": "instant",
"minPeriod": pathOption.interval
}]
};
// subscribe to updates for the context and path
app.subscriptionmanager.subscribe(
localSubscription,
unsubscribes,
subscriptionError => {
app.error('Error: ' + subscriptionError);
},
delta => {
// add a handler for this update
// app.debug(`Received delta: ${JSON.stringify(delta)}`);
handleUpdates(delta, pathOption);
}
);
app.debug(`Added subscription to: ${JSON.stringify(localSubscription)}`);
} else {
app.error(`Skipping subscription to: ${pathOption.context}/.../${pathOption.path}`);
}
});
};
let _stop = function (options) {
app.debug(`${plugin.name} Stopped...`)
unsubscribes.forEach(f => f());
unsubscribes = [];
};
const plugin = {
id: "signalk-to-influxdb-v2-buffer",
name: "Signalk To Influxdbv2.0",
schema: {
"type": "object",
"description": "This plugin saves data to an influxdbv2 database, and buffers data without an internet connection (note: a server restart is needed for updated settings to take effect)",
"required": [
"influxHost",
"influxToken",
"influxOrg",
"influxBucket",
"uploadFrequency"
],
"properties": {
"influxHost": {
"type": "string",
"title": "Influxdb2.0 Host URL",
"description": "the url to your cloud hosted influxb2.0"
},
"influxToken": {
"type": "string",
"title": "Influxdb2.0 Token",
"description": "the token for your cloud hosted influxb2.0 bucket"
},
"influxOrg": {
"type": "string",
"title": "Influxdb2.0 Organisation",
"description": "your Influxdb2.0 organisation"
},
"influxBucket": {
"type": "string",
"title": "Influxdb2.0 Bucket",
"description": "which bucket you are storing the metrics in"
},
"writeOptions": {
"type": "object",
"title": "Write Options",
"required": [
"batchSize",
"flushInterval",
"maxBufferLines",
"maxRetries",
"maxRetryDelay",
"minRetryDelay",
"retryJitter"
],
"properties": {
"batchSize": {
"type": "number",
"title": "Batch Size",
"description": "the maximum points/line to send in a single batch to InfluxDB server",
"default": 1000
},
"flushInterval": {
"type": "number",
"title": "Flush Interval",
"description": "maximum time in millis to keep points in an unflushed batch, 0 means don't periodically flush",
"default": 30000
},
"maxBufferLines": {
"type": "number",
"title": "Maximum Buffer Lines",
"description": "maximum size of the retry buffer - it contains items that could not be sent for the first time",
"default": 32000
},
"maxRetries": {
"type": "number",
"title": "Maximum Retries",
"description": "maximum delay between retries in milliseconds",
"default": 3
},
"maxRetryDelay": {
"type": "number",
"title": "Maximum Retry Delay",
"description": "maximum delay between retries in milliseconds",
"default": 5000
},
"minRetryDelay": {
"type": "number",
"title": "Minimum Retry Delay",
"description": "minimum delay between retries in milliseconds",
"default": 180000
},
"retryJitter": {
"type": "number",
"title": "Retry Jitter",
"description": "a random value of up to retryJitter is added when scheduling next retry",
"default": 200
}
}
},
"tagAsSelf": {
"type": "boolean",
"title": "Tag vessel measurements as 'self' if applicable",
"description": "tag measurements as {self: true} when from vessel.self - requires an MMSI or UUID to be set in the Vessel Base Data on the Server->Settings page",
"default": false
},
"defaultTags": {
"type": "array",
"title": "Default Tags",
"description": "default tags added to every measurement sent to InfluxDB",
"default": [],
"items": {
"type": "object",
"required": [
"name",
"value"
],
"properties": {
"name": {
"type": "string",
"title": "Tag Name"
},
"value": {
"type": "string",
"title": "Tag Value"
}
}
}
},
"pathArray": {
"type": "array",
"title": "Paths",
"default": [],
"items": {
"type": "object",
"required": [
"context",
"path",
"interval"
],
"properties": {
"enabled": {
"type": "boolean",
"title": "Enabled?",
"description": "enable writes to Influxdb2.0 for this path (server restart is required)",
"default": true
},
"context": {
"type": "string",
"title": "SignalK context",
"description": "context to record e.g.'self' for own ship, or 'vessels.*' for all vessels, or '*' for everything",
"default": "self"
},
"path": {
"type": "string",
"title": "SignalK path",
"description": "path to record e.g.'navigation.position' for positions, or 'navigation.*' for all navigation data, or '*' for everything",
},
"interval": {
"type": "number",
"description": "milliseconds between data records",
"title": "Recording interval",
"default": 1000
},
"expand": {
"type": "boolean",
"title": "Expand properties",
"description": "select to expand the properties of each measurement into separate rows where possible e.g. 'navigation.position' would expand into three rows for 'navigation.position.latitude','navigation.position.longitude' and 'navigation.position.altitude'. If not selected, the measurement is written to InfluxDB as one row where value={JSON}, and field tags are added for each property. We recommend to turn this on to avoid exceeding cardinality limits in Influx",
"default": true
},
"pathTags": {
"title": "Path tags",
"type": "array",
"description": "Define any tags to include for this path:",
"default": [],
"items": {
"type": "object",
"required": [
"name",
"value"
],
"properties": {
"name": {
"type": "string",
"title": "Tag Name"
},
"value": {
"type": "string",
"title": "Tag Value"
}
}
}
}
}
}
}
}
},
start: _start,
stop: _stop
}
return plugin
}