-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmqtt_pushgateway.py
executable file
·227 lines (182 loc) · 7.6 KB
/
mqtt_pushgateway.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# kate: space-indent on; indent-width 4; replace-tabs on;
import re
import pytoml
import logging
import socket
import time
import json
import paho.mqtt.client as mqtt
from collections import defaultdict
from datetime import datetime, timedelta
from dateutil.parser import parse as parse_date, ParserError
from flask import Flask, Response, redirect
app = Flask("mqtt_pushgateway")
with open("config.toml") as fd:
config = pytoml.load(fd)
class Topic(object):
def __init__(self):
self.metric = None
self.keywords = {}
self.value = None
self.last_update = datetime.fromtimestamp(0)
self.expire = config["mqtt"].get("expire")
self.ignore = False
self.known_vals = set([])
self.is_numeric = True
def update(self, topic, value):
# topic is e.g. sensors/somewhere/temperature
# we want to apply our regexen, see if one matches
# if one matches, use it to determine self.metric and self.keywords
if self.metric is None:
for cfg_topic in config["topic"]:
if "match" in cfg_topic:
m = re.match(cfg_topic["match"], topic)
if m is not None:
self.keywords = m.groupdict()
self.expire = cfg_topic.get("expire")
self.ignore = cfg_topic.get("ignore", False)
if "__metric__" in self.keywords:
self.metric = self.keywords.pop("__metric__")
if "metric" in cfg_topic:
self.metric = cfg_topic["metric"]
break
if self.metric is None:
self.metric = topic.rsplit("/", 1)[1]
if self.expire is None:
self.expire = config["mqtt"].get("expire")
self.keywords["mqtt_topic"] = topic
def _try_float(value):
try:
return float(value)
except (TypeError, ValueError):
return None
def _try_date(value):
# See if YYYY-MM-DD or starts with YYYY-MM-DD[T ]HH:MM:SS
try:
if not re.match(r'^\d\d\d\d\-\d\d\-\d\d([T ]\d\d:\d\d:\d\d.*)?', value):
return None
except TypeError:
logging.warning('Failed to match "%r" against a regex' % value, exc_info=True)
return None # it's probably not a date then
try:
return parse_date(value).timestamp()
except ParserError:
return None
if (parsed_value := _try_float(value)) is not None:
self.value = parsed_value
self.is_numeric = True
elif (parsed_value := _try_date(value)) is not None:
self.value = parsed_value
self.is_numeric = True
else:
self.value = (value
.replace("\n", "")
.replace("\r", "")
.replace(" ", "")
.replace('"', ""))
self.known_vals.add(self.value)
self.is_numeric = False
self.last_update = datetime.now()
@property
def forget(self):
return datetime.now() - self.last_update > timedelta(hours=1)
def __str__(self):
data_age = (datetime.now() - self.last_update).total_seconds()
if self.is_numeric:
if self.expire is not None and data_age > self.expire:
# metric is expired, return data age only
template = 'mqtt_data_age{%(kwds)s,metric="%(metric)s"} %(age)f'
else:
template = ('%(metric)s{%(kwds)s} %(value)f\n'
'mqtt_data_age{%(kwds)s,metric="%(metric)s"} %(age)f')
return template % dict(
metric = self.metric,
kwds = ','.join([ '%s="%s"' % item for item in self.keywords.items() ]),
value = self.value,
age = data_age
)
else:
series = ['mqtt_data_age{%(kwds)s,metric="%(metric)s"} %(age)f' % dict(
metric = self.metric,
kwds = ','.join([ '%s="%s"' % item for item in self.keywords.items() ]),
age = data_age
)]
if self.expire is None or data_age < self.expire:
for known_val in self.known_vals:
# generate one time series for each known value, where the value is 1
# for the current value and 0 for all else
series.append('%(metric)s{%(kwds)s} %(value)f' % dict(
metric = self.metric,
kwds = ','.join([ '%s="%s"' % item for item in dict(self.keywords, **{self.metric: known_val}).items() ]),
value = int(known_val == self.value)
))
return "\n".join(series)
metrics = defaultdict(Topic)
@app.route("/")
def http_index():
return redirect("/metrics", code=302)
@app.route("/metrics")
def http_metrics():
content = [str(metric)
for metric in metrics.values()
if not metric.ignore and not metric.forget
]
return Response('\n'.join(content + ['']), mimetype="text/plain")
def on_message(client, userdata, message):
topic = message.topic
try:
payload = message.payload.decode("utf-8")
except:
logging.warning("Payload for '%s' is not valid utf-8, ignored" % topic, exc_info=True)
else:
payload = payload.strip()
logging.info("Message received: %s => %s", topic, payload)
if payload[0] == "{" and payload[-1] == "}":
try:
json_message = json.loads(payload)
except ValueError:
# payload is not json, do a standard update
logging.warning("Failed to parse json value for '%s'", topic, exc_info=True)
else:
def _flatten(into_result, prefix, val):
if isinstance(val, dict):
for inner_key, inner_val in val.items():
_flatten(into_result, prefix + [inner_key], inner_val)
elif isinstance(val, list):
for idx, elem in enumerate(val):
_flatten(into_result, prefix + [str(idx)], elem)
elif val is not None:
into_result["/".join(prefix)] = val
return into_result
for key, val in _flatten({}, prefix=[], val=json_message).items():
key_topic = "{}/{}".format(topic, key)
metrics[key_topic].update(key_topic, val)
return
try:
metrics[topic].update(topic, payload)
except:
logging.warning("Metric update for '%s' failed", topic, exc_info=True)
def main():
client = mqtt.Client(config["mqtt"]["client_id"] % dict(
hostname=socket.gethostname()
))
client.username_pw_set(config["mqtt"]["username"], config["mqtt"]["password"])
client.on_message = on_message
def on_connect(client, userdata, flags, result):
logging.info("subscribing")
for topic in config["mqtt"]["subscribe"]:
client.subscribe(topic)
client.on_connect = on_connect
client.connect(config["mqtt"]["broker"], port=config["mqtt"]["port"])
client.loop_start()
app.debug = False
try:
app.run(host=config["exporter"]["listen"], port=config["exporter"]["port"])
except KeyboardInterrupt:
print("exiting")
client.disconnect()
client.loop_stop()
if __name__ == '__main__':
main()