-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlflow.lua
70 lines (61 loc) · 1.86 KB
/
lflow.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package.path = package.path .. ";;;./lflow/Toribio/?.lua;./lflow/Toribio/Lumen/?.lua"
local log = require 'log'
log.setlevel('INFO', 'LFLOW')
log.setlevel('ALL', 'TORIBIO')
--require "log".setlevel('ALL', 'TORIBIO')
local sched = require "sched"
local lflow = require 'lflow/init'
local loadfile = require 'lib/compat_env'.loadfile
local filename = _G.arg[1] --'lflow/test.lflow'
local ffilters = {}
local function filters_stop ()
sched.run( function()
log('LFLOW', 'DETAIL', 'Stopping flow')
--sched.run(function() sched.signal(fevents['lflow_run'], false) end)
lflow.stop()
for fname, f in pairs(ffilters)do
log('LFLOW', 'DEBUG', 'Pausing "%s" (%s)', fname, tostring(f.taskd))
f.taskd:set_pause(true)
end
end)
end
local function filters_start ()
sched.run( function()
log('LFLOW', 'DETAIL', 'Starting flow')
for fname, f in pairs(ffilters)do
log('LFLOW', 'DEBUG', 'Unpausing "%s" (%s)', fname, tostring(f.taskd))
f.taskd:set_pause(false)
end
--sched.run(function() sched.signal(fevents['lflow_run'], true) end)
lflow.start()
end)
end
if filename then
local file=assert(io.open(filename, 'r'))
local linenumber = 0
for line in file:lines() do
linenumber=linenumber+1
--line = line:gsub('%s+', '') --remove whitespaces
if line~=string.rep(' ', #line) and not line:find('^%s*#') then
--print ('line:', line)
local filter, err = lflow.parse_line(line)
if filter then
ffilters['filter: '..linenumber] = filter
else
io.stderr:write('lflow: '..filename..':'..linenumber..': '..tostring(err)..'\n')
end
end
end
filters_start()
--[[
sched.run(function()
while true do
sched.sleep(1)
for k, v in pairs(lflow.fevents) do
print (k, v.emitted, v.caught)
end
end
end)
--]]
end
sched.loop()