forked from alerta/alerta-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathalerta_influxdb.py
105 lines (78 loc) · 3.17 KB
/
alerta_influxdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import logging
import os
from datetime import datetime
try:
from alerta.plugins import app # alerta >= 5.0
except ImportError:
from alerta.app import app # alerta < 5.0
from alerta.plugins import PluginBase
from influxdb import InfluxDBClient
LOG = logging.getLogger('alerta.plugins.influxdb')
# 'influxdb://username:password@localhost:8086/databasename'
DEFAULT_INFLUXDB_DSN = 'influxdb://user:pass@localhost:8086/alerta'
INFLUXDB_DSN = os.environ.get('INFLUXDB_DSN') or app.config.get('INFLUXDB_DSN', DEFAULT_INFLUXDB_DSN)
INFLUXDB_DATABASE = os.environ.get('INFLUXDB_DATABASE') or app.config.get('INFLUXDB_DATABASE', None)
# Specify the name of a measurement to which all alerts will be logged
INFLUXDB_MEASUREMENT = os.environ.get('INFLUXDB_MEASUREMENT') or app.config.get('INFLUXDB_MEASUREMENT', 'event')
class InfluxDBWrite(PluginBase):
def __init__(self, name=None):
self.client = InfluxDBClient.from_dsn(INFLUXDB_DSN, timeout=2)
dbname = INFLUXDB_DATABASE or self.client._database
try:
if dbname:
self.client.switch_database(dbname)
self.client.create_database(dbname)
except Exception as e:
LOG.error('InfluxDB: ERROR - %s' % e)
super(InfluxDBWrite, self).__init__(name)
def pre_receive(self, alert):
return alert
def _influxdb_prepare_point(self, alert, status=None, text=None):
tags = {}
for tag in alert.tags:
try:
k, v = tag.split('=', 1)
tags[k] = v
except ValueError:
pass
tags.update(
event=alert.event,
resource=alert.resource,
environment=alert.environment,
severity=alert.severity,
status=status if status else alert.status,
service=','.join(alert.service)
)
if alert.customer:
tags.update(customer=alert.customer)
# event data
point = {
'measurement': INFLUXDB_MEASUREMENT,
'time': datetime.utcnow() if status else alert.create_time,
'tags': tags,
'fields': {}
}
# make sure we store the value in its original format
if isinstance(alert.value, float) or isinstance(alert.value, int):
point['fields']['value'] = alert.value
else:
point['fields']['value'] = str(alert.value)
if text:
point['fields']['text'] = text
return point
def post_receive(self, alert):
point = self._influxdb_prepare_point(alert)
LOG.debug('InfluxDB: point=%s', point)
try:
self.client.write_points([point], time_precision='ms')
except Exception as e:
raise RuntimeError("InfluxDB: ERROR - %s" % e)
def status_change(self, alert, status, text):
if status not in ['ack', 'assign']:
return
point = self._influxdb_prepare_point(alert, status, text)
LOG.debug('InfluxDB: point=%s', point)
try:
self.client.write_points([point], time_precision='ms')
except Exception as e:
raise RuntimeError("InfluxDB: ERROR - %s" % e)