-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontroller.lua
187 lines (160 loc) · 5.96 KB
/
controller.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
--- The controller where all the required components are linked to enable the library.
local controller = {}
local poller = require("resty.dynacode.poller")
local fetcher = require("resty.dynacode.fetch")
local compiler = require("resty.dynacode.compiler")
local runner = require("resty.dynacode.runner")
local opts = require("resty.dynacode.opts")
local validator = require("resty.dynacode.validator")
local cache = require("resty.dynacode.cache")
local event_emitter = require("resty.dynacode.event_emitter")
local json = require("cjson.safe")
controller.workers_max_jitter = 0
controller.plugin_api_poll_at_init = false
controller.plugin_api_uri = nil
controller.plugin_api_polling_interval = 30 -- seconds
controller.plugin_api_timeout = 5 -- seconds
controller.regex_options = "o" -- compile and cache https://github.com/openresty/lua-nginx-module#ngxrematch
controller.ready = false
controller.shm = nil
controller.events = event_emitter
function controller.logger(msg)
ngx.log(ngx.ERR, string.format("[dynacode phase=%s] %s", ngx.get_phase(), msg))
end
controller.validation_rules = {
validator.present_string("shm"),
validator.present_string("plugin_api_uri"),
}
function controller.setup(opt)
local ok, err = validator.valid(controller.validation_rules, opt)
if not ok then
controller.events.emit(controller.events.ON_ERROR, 'validation', err)
controller.logger(err)
return false, err
end
opts.merge(controller, opt)
-- caching setup
ok, err = cache.setup({
logger = controller.logger,
now = ngx.time,
ttl = controller.plugin_api_polling_interval * 0.9,
ngx_shared = ngx.shared[controller.shm],
})
if not ok then
controller.logger(string.format("it was not possible to setup the cache due to %s", err))
controller.events.emit(controller.events.ON_ERROR, 'setup', err)
return false
end
-- compiler setup
ok, err = compiler.setup({
events = controller.events,
logger = controller.logger,
})
if not ok then
controller.logger(string.format("it was not possible to setup the compiler due to %s", err))
controller.events.emit(controller.events.ON_ERROR, 'setup', err)
return false
end
-- runner setup
ok, err = runner.setup({
logger = controller.logger,
regex_options = controller.regex_options,
events = controller.events,
})
if not ok then
controller.logger(string.format("it was not possible to setup the runner due to %s", err))
controller.events.emit(controller.events.ON_ERROR, 'setup', err)
return false
end
-- api fetch setup
ok, err = fetcher.setup({
plugin_api_uri = controller.plugin_api_uri,
plugin_api_timeout = controller.plugin_api_timeout,
events = controller.events,
})
if not ok then
controller.logger(string.format("it was not possible to setup the fetch api due to %s", err))
controller.events.emit(controller.events.ON_ERROR, 'setup', err)
return false
end
-- poller setup
ok, err = poller.setup({
interval = controller.plugin_api_polling_interval,
workers_max_jitter = controller.workers_max_jitter,
callback = controller.recurrent_function,
start_right_away = controller.plugin_api_poll_at_init,
logger = controller.logger,
})
if not ok then
controller.logger(string.format("it was not possible to setup the poller due to %s", err))
controller.events.emit(controller.events.ON_ERROR, 'setup', err)
return false
end
controller.ready = true
return true
end
function controller.recurrent_function()
local ok, err = pcall(controller._recurrent_function)
if not ok then
controller.events.emit(controller.events.ON_ERROR, 'poll', err)
end
end
function controller._recurrent_function()
local response, err
if cache.should_refresh() then
response, err = fetcher.request_api()
if not response then
controller.logger(string.format("it was not possible to request due to %s", err))
controller.events.emit(controller.events.BG_DIDNT_UPDATE_PLUGINS)
return
end
cache.set(response)
controller.events.emit(controller.events.BG_CACHE_MISS)
else
response = cache.get()
controller.events.emit(controller.events.BG_CACHE_HIT)
end
if not response or response == "" then
controller.logger("the cache was empty")
controller.events.emit(controller.events.BG_DIDNT_UPDATE_PLUGINS)
return
end
local table_response
table_response, err = json.decode(response)
if table_response == nil or json.null == table_response or type(table_response) ~= "table" or err ~= nil then
controller.logger(string.format("the api response was invalid (empty, null, not expected) err=%s", err))
controller.events.emit(controller.events.BG_DIDNT_UPDATE_PLUGINS)
return
end
local errors = compiler.compile(table_response)
for _, e in ipairs(errors) do
controller.logger(string.format("[warn] %s", e))
end
-- TODO: validate plugins minimal expected structure to not override current with invalid api response
-- saving/updating the copy locally per worker
controller.plugins = runner.phasify_plugins(table_response)
controller.events.emit(controller.events.BG_UPDATED_PLUGINS)
end
--- It runs the proper task, if it's the init_phase it's going to fire the pollers
-- if it's other phases it's going to select the proper plugins and run them.
function controller.run()
local ok, err = pcall(controller._run)
if not ok then
controller.events.emit(controller.events.ON_ERROR, 'general_run', err)
controller.logger(string.format("there was an general error during the run err = %s", err))
end
end
function controller._run()
if not controller.ready then
controller.logger("[NOT_READY] no functions are running, you must properly setup dynacode.setup(opts)")
return
end
local phase = ngx.get_phase()
if phase == "init_worker" then
controller.logger(string.format("running the poller for worker pid=%d", ngx.worker.pid()))
poller.run()
return
end
runner.run(controller.plugins, ngx.var.host, ngx.get_phase())
end
return controller