-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathts_line_protocol.lua
284 lines (244 loc) · 11.6 KB
/
ts_line_protocol.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
-- This Source Code Form is subject to the terms of the Mozilla Public
-- License, v. 2.0. If a copy of the MPL was not distributed with this
-- file, You can obtain one at http://mozilla.org/MPL/2.0/.
--[=[
Provides functions to convert full Heka message contents to line protocol for
InfluxDB HTTP write API (new in InfluxDB v0.9.0) or Graphite/Carbon and
any other time series data store that this functionality makes sense for.
Optionally includes all standard message fields as tags or fields and
iterates through all of the dynamic fields to add as measurements, skipping
any fields explicitly omitted using the `skip_fields` config option. It can
also map any Heka message fields as tags in the request sent to the InfluxDB
write API, using the `tag_fields` config option. All dynamic fields in the
Heka message are converted to separate points separated by newlines that are
submitted to InfluxDB, unless source_value_field is defined.
API
^^^
**carbon_line_msg(config)**
Wrapper function that calls others within this module and the field_util
module to generate a table of Carbon line protocol messages that are
derived from the dynamic fields in a Heka message. Base fields or
dynamic fields can be used in the metric name portion of the message.
Configuration is implemented in the encoders that utilize this module.
*Arguments*
- config (table or nil)
Table of config option overrides that come from the client
of this API. Defaults for this module are defined within
the set_config function and clients implementing this API
can reference it for available options.
*Return*
A table of Carbon line protocol messages ready to be sent to
a Carbon server after being looped through in an encoder implementing
this API.
**influxdb_line_msg(config)**
Wrapper function that calls others within this module and the field_util
module to generate a table of InfluxDB line protocol messages that are
derived from the base or dynamic fields in a Heka message. Base fields or
dynamic fields can be used in the metric name portion of the message and
included as tags if desired. Configuration is implemented in the encoders
that utilize this module.
*Arguments*
- config (table or nil)
Table of config option overrides that come from the client
of this API. Defaults for this module are defined within
the set_config function and clients implementing this API
can reference it for available options.
*Return*
A table of InfluxDB line protocol messages ready to be sent to
an InfluxDB server after being looped through in an encoder
implementing this API.
**set_config(client_config)**
This function takes a table of configuration options as input that can
override the defaults that are set within it. The table is then used to
compare with the default `module_config`, updates that table and then
returns it to the client calling this API. Calls are then made out to
public functions exposed by the field_util module to populate the tables
and variables related to base fields, tagging fields and skipping fields
kept within the module_config table. Clients utilizing this API
must call this function first to set the configuration for subsequent
calls to public functions that it exposes.
*Arguments*
- client_config (table or nil)
Table of configuration option overrides with the keys being
the option names and the values being the option values.
*Return*
A table of configuration options that can then be passed to other
public functions that this API exposes when calling them.
--]=]
local decode_message = decode_message
local interp = require "msg_interpolate"
local ipairs = ipairs
local field_util = require "field_util"
local math = require "math"
local read_config = read_config
local read_message = read_message
local pairs = pairs
local string = require "string"
local table = require "table"
local tostring = tostring
local type = type
local M = {}
setfenv(1, M) -- Remove external access to contain everything in the module.
local function influxdb_kv_fmt(string)
return tostring(string):gsub("([ ,])", "\\%1")
end
local function points_tags_tables(config)
local name_prefix = config.name_prefix or ""
if config.interp_name_prefix then
name_prefix = interp.interpolate_from_msg(name_prefix)
end
local name_prefix_delimiter = config.name_prefix_delimiter or ""
local used_tag_fields = config.used_tag_fields
local skip_fields = config.skip_fields
-- Initialize the tags table, including base field tag values in
-- list if the magic **all** or **all_base** config values are
-- defined.
local tags = {}
if not config.carbon_format
and (config.tag_fields_all
or config.tag_fields_all_base
or config.used_tag_fields) then
for field in pairs(field_util.base_fields_tag_list) do
if config.tag_fields_all or config.tag_fields_all_base or used_tag_fields[field] then
local value = read_message(field)
table.insert(tags, influxdb_kv_fmt(field).."="..tostring(influxdb_kv_fmt(value)))
end
end
end
-- Initialize the table of data points and populate it with data
-- from the Heka message. When skip_fields includes "**all_base**",
-- only dynamic fields are included as InfluxDB data points, while
-- the base fields serve as tags for them. If skip_fields does not
-- define any base fields, they are added to the fields of each data
-- point and each dynamic field value is set as the "value" field.
-- Setting skip_fields to "**all_base**" is recommended to avoid
-- redundant data being stored in each data point (the base fields
-- as fields and as tags).
local points = {}
local msg = decode_message(read_message("raw"))
if msg.Fields then
for _, field_entry in ipairs(msg.Fields) do
local field = field_entry["name"]
local value
for _, field_value in ipairs(field_entry["value"]) do
value = field_value
end
-- Replace non-word characters with underscores for Carbon
-- to avoid periods resulting in extraneous directories
if config["carbon_format"] then
field = field:gsub("[^%w]", "_")
end
-- Include the dynamic fields as tags if they are defined in
-- configuration or the magic value "**all**" is defined.
-- Convert value to a string as this is required by the API
if not config["carbon_format"]
and (config["tag_fields_all"]
or (config["used_tag_fields"] and used_tag_fields[name])) then
table.insert(tags, influxdb_kv_fmt(field).."="..tostring(influxdb_kv_fmt(value)))
end
if config["source_value_field"] and field == config["source_value_field"] then
points[name_prefix] = value
-- Only add fields that are not requested to be skipped
elseif not config["skip_fields_str"]
or (config["skip_fields"] and not skip_fields[field]) then
-- Set the name attribute of this table by concatenating name_prefix
-- with the name of this particular field
points[name_prefix..name_prefix_delimiter..field] = value
end
end
else
return 0
end
return points, tags
end
--[[ Public Interface]]
function carbon_line_msg(config)
local api_message = {}
local message_timestamp = field_util.message_timestamp(config.timestamp_precision)
local points = points_tags_tables(config)
for name, value in pairs(points) do
value = tostring(value)
-- Only add metrics that are originally integers or floats, as that is
-- what Carbon is limited to storing.
if string.match(value, "^[%d.]+$") then
table.insert(api_message, name.." "..value.." "..message_timestamp)
end
end
return api_message
end
function influxdb_line_msg(config)
local api_message = {}
local message_timestamp = field_util.message_timestamp(config.timestamp_precision)
local points, tags = points_tags_tables(config)
-- Build a table of data points that we will eventually convert
-- to a newline delimited list of InfluxDB write API line protocol
-- formatted values that are then injected back into the pipeline.
for name, value in pairs(points) do
-- Wrap in double quotes and escape embedded double quotes
-- as defined by the protocol.
if type(value) == "string" then
value = '"'..value:gsub('"', '\\"')..'"'
end
-- Always send numbers as formatted floats, so InfluxDB will accept
-- them if they happen to change from ints to floats between
-- points in time. Forcing them to always be floats avoids this.
-- Use the decimal_precision config option to control the
-- numbers after the decimal that are printed.
if type(value) == "number" or string.match(value, "^[%d.]+$") then
value = string.format("%."..config.decimal_precision.."f", value)
end
-- Format the line differently based on the presence of tags
-- i.e. length of the tags table is > 0
if tags and #tags > 0 then
insert_value = string.format("%s,%s %s=%s %d", influxdb_kv_fmt(name), table.concat(tags, ","),
config.value_field_key, value, message_timestamp)
table.insert(api_message, insert_value)
else
insert_value = string.format("%s %s=%s %d", influxdb_kv_fmt(name), config.value_field_key, value,
message_timestamp)
table.insert(api_message, insert_value)
end
end
return api_message
end
function set_config(client_config)
-- Initialize table with default values for ts_line_protocol module
local module_config = {
carbon_format = false,
decimal_precision = "6",
name_prefix = false,
name_prefix_delimiter = false,
skip_fields_str = false,
source_value_field = false,
tag_fields_str = "**all_base**",
timestamp_precision = "ms",
value_field_key = "value"
}
-- Update module_config defaults with those found in client configs
for option in pairs(module_config) do
if client_config[option] then
module_config[option] = client_config[option]
end
end
-- Remove blacklisted fields from the set of base fields that we use, and
-- create a table of dynamic fields to skip.
if module_config.skip_fields_str then
module_config.skip_fields,
module_config.skip_fields_all_base,
module_config.skip_fields_all = field_util.field_map(module_config.skip_fields_str)
module_config.used_base_fields = field_util.used_base_fields(module_config.skip_fields)
end
-- Create and populate a table of fields to be used as tags
if module_config.tag_fields_str then
module_config.used_tag_fields,
module_config.tag_fields_all_base,
module_config.tag_fields_all = field_util.field_map(module_config.tag_fields_str)
end
-- Cache whether or not name_prefix needs interpolation
module_config.interp_name_prefix = false
if module_config.name_prefix and string.find(module_config.name_prefix, "%%{[%w%p]-}") then
module_config.interp_name_prefix = true
end
return module_config
end
return M