Skip to content

Commit

Permalink
"Updated Timestamp and config parser
Browse files Browse the repository at this point in the history
  • Loading branch information
arvindh123 committed Apr 29, 2019
1 parent 62d0866 commit e15be43
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 25 deletions.
73 changes: 57 additions & 16 deletions etc/emq_plugin_2db.config
Original file line number Diff line number Diff line change
@@ -1,45 +1,86 @@


[
{emq_plugin_2db, [
{odbc, ["mysqlserver","test","test"] },
{odbc, ["localmssql","test","test"] },
{topics ,
[
{<<"sgcd/solvent-live">>,1},
{<<"sgcd/solvent-status">>,2}
{<<"plant1/GasSensors">>,1},
{<<"plant1/Machine1">>,2},
{<<"plant1/Machine1Alarm">>,3},
{<<"plant1/TempHumi">>,4},
{<<"bu1/location/temphumisensors">>,5}
]
},

{reqlist,
[
[<<"ppm">>,<<"temp">>,<<"suid">>],
[<<"ppm1">>,<<"temp1">>,<<"suid1">>]
[<<"ts">>, <<"suid">>, <<"SupAirRH">>, <<"ProcInRH">>, <<"ProcInAirTemp">>, <<"ProcOutAirTemp">>, <<"ReactInTemp">>, <<"ReactOutTemp">>],
[<<"ts">>, <<"suid">>, <<"AlarmType">>, <<"AlarmStatus">>],
[<<"ts">>, <<"suid">>, <<"temp">>, <<"humi">>],
[<<"ts">>,<<"suid">>,<<"temp">>,<<"humi">>]
]

},

{que1 ,
[
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)",
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)"
"INSERT INTO [Marposs].[dbo].[plant1GasSensors] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)",

"INSERT INTO [Marposs].[dbo].[plant1Machine1] ([ts], [suid], [SupAirRH], [ProcInRH], [ProcInAirTemp],
[ProcOutAirTemp], [ReactInTemp], [ReactOutTemp] ) VALUES(?,?,?,?,?,?,?,?)",

"INSERT INTO [Marposs].[dbo].[plant1Machine1Alarm] ([ts], [suid], [AlarmType], [AlarmStatus] ) VALUES(?,?,?,?)",

"INSERT INTO [Marposs].[dbo].[plant1TempHumi] ([ts], [suid], [temp], [humi] ) VALUES(?,?,?,?)",

"INSERT INTO [DMS].[dbo].[bu1locationtemphumisensors] ( [ts] ,[suid] ,[temp] ,[humi]) VALUES( ? ,? , ? , ?)"
]

},

{que2,
[

"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] },
"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")] }
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] }

]\.",

"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]}


"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"SupAirRH\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcInRH\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcInAirTemp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcOutAirTemp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ReactInTemp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ReactOutTemp\">>, MessageMaps,\"int\")]}
]\.",

"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"AlarmType\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"AlarmStatus\">>, MessageMaps,\"int\")]}
]\.",

"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"humi\">>, MessageMaps,\"int\")]}
]\.",

"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] },
{{sql_varchar, 80}, [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"string\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"humi\">>, MessageMaps,\"int\")] }

]\."



]
}
]}
Expand Down
30 changes: 21 additions & 9 deletions src/emq_plugin_2db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,28 @@ write(Odbc,NamePid,Que1,Que2) ->

end.

try_get_ts(TimestampStr,MessageMaps) ->
try_get_ts(Message,MessageMaps) ->
case maps:is_key(<<"ts">> ,MessageMaps) of
true->
binary_to_list(maps:get(<<"ts">> ,MessageMaps));
TimeBin = maps:get(<<"ts">> ,MessageMaps),
case is_binary(TimeBin) of
true -> binary_to_list(TimeBin);
false -> lists:flatten(io_lib:format("~p", [TimeBin]));
undefined ->lists:flatten(io_lib:format("~p", [TimeBin]))
end;
false->
TimestampStr;
Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])),
Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])),
Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])),
Timetemp = string:concat(Timestamp1,Timestamp2),
TimestampStr = string:concat(Timetemp,Timestamp3),
TimestampStr;
undefined ->
Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])),
Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])),
Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])),
Timetemp = string:concat(Timestamp1,Timestamp2),
TimestampStr = string:concat(Timetemp,Timestamp3),
TimestampStr
end.

Expand Down Expand Up @@ -168,9 +183,7 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) ->
MessageMaps = jsx:decode(MessageBin, [return_maps]),
case lists:any(fun (Elem) -> lists:member(Elem, lists:nth(Index,ReqkeysList)) end, maps:keys(MessageMaps) ) of
true->
Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])),
Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])),
Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])),

UsernameBin = element(2, element(4, Message)),
ClientBin = element(1,element(4, Message)),
ClientBinRe = re:replace(ClientBin, "\\s+", "", [global,{return,binary}]),
Expand All @@ -180,16 +193,15 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) ->



Timetemp = string:concat(Timestamp1,Timestamp2),
TimestampStr = string:concat(Timetemp,Timestamp3),



Que1 = lists:nth(Index,Que1s),
TQue2 = lists:nth(Index,Que2s),

{ok, Tokens, _} = erl_scan:string(TQue2),
{ok, Parsed} = erl_parse:parse_exprs(Tokens),
Bindings = [{'TimestampStr', TimestampStr}, {'MessageMaps', MessageMaps}],
Bindings = [{'Message', Message}, {'MessageMaps', MessageMaps}],
{value, Que2, _} = erl_eval:exprs(Parsed, Bindings),
% io:format("Que1.....~p~n ", [Que1]),
% io:format("Que2.....~p~n ", [Que2]),
Expand Down

0 comments on commit e15be43

Please sign in to comment.