diff --git a/etc/emq_plugin_2db.config b/etc/emq_plugin_2db.config index 9c763b3..1b61680 100644 --- a/etc/emq_plugin_2db.config +++ b/etc/emq_plugin_2db.config @@ -1,26 +1,40 @@ + [ {emq_plugin_2db, [ - {odbc, ["mysqlserver","test","test"] }, + {odbc, ["localmssql","test","test"] }, {topics , [ - {<<"sgcd/solvent-live">>,1}, - {<<"sgcd/solvent-status">>,2} + {<<"plant1/GasSensors">>,1}, + {<<"plant1/Machine1">>,2}, + {<<"plant1/Machine1Alarm">>,3}, + {<<"plant1/TempHumi">>,4}, + {<<"bu1/location/temphumisensors">>,5} ] }, {reqlist, [ [<<"ppm">>,<<"temp">>,<<"suid">>], - [<<"ppm1">>,<<"temp1">>,<<"suid1">>] + [<<"ts">>, <<"suid">>, <<"SupAirRH">>, <<"ProcInRH">>, <<"ProcInAirTemp">>, <<"ProcOutAirTemp">>, <<"ReactInTemp">>, <<"ReactOutTemp">>], + [<<"ts">>, <<"suid">>, <<"AlarmType">>, <<"AlarmStatus">>], + [<<"ts">>, <<"suid">>, <<"temp">>, <<"humi">>], + [<<"ts">>,<<"suid">>,<<"temp">>,<<"humi">>] ] - }, - + {que1 , [ - "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)", - "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)" + "INSERT INTO [Marposs].[dbo].[plant1GasSensors] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)", + + "INSERT INTO [Marposs].[dbo].[plant1Machine1] ([ts], [suid], [SupAirRH], [ProcInRH], [ProcInAirTemp], + [ProcOutAirTemp], [ReactInTemp], [ReactOutTemp] ) VALUES(?,?,?,?,?,?,?,?)", + + "INSERT INTO [Marposs].[dbo].[plant1Machine1Alarm] ([ts], [suid], [AlarmType], [AlarmStatus] ) VALUES(?,?,?,?)", + + "INSERT INTO [Marposs].[dbo].[plant1TempHumi] ([ts], [suid], [temp], [humi] ) VALUES(?,?,?,?)", + + "INSERT INTO [DMS].[dbo].[bu1locationtemphumisensors] ( [ts] ,[suid] ,[temp] ,[humi]) VALUES( ? ,? , ? , ?)" ] }, @@ -28,18 +42,45 @@ {que2, [ - "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] }, + "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] }, {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")] }, - {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] }, - {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")] } + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] } + ]\.", - - "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] }, - {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]}, - {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]}, - {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]} + + + "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"SupAirRH\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcInRH\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcInAirTemp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ProcOutAirTemp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ReactInTemp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ReactOutTemp\">>, MessageMaps,\"int\")]} + ]\.", + + "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"AlarmType\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"AlarmStatus\">>, MessageMaps,\"int\")]} + ]\.", + + "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"humi\">>, MessageMaps,\"int\")]} + ]\.", + + "[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(Message, MessageMaps)] }, + {{sql_varchar, 80}, [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"string\")] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] }, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"humi\">>, MessageMaps,\"int\")] } + ]\." + + ] } ]} diff --git a/src/emq_plugin_2db.erl b/src/emq_plugin_2db.erl index 09384c7..9ad21e1 100644 --- a/src/emq_plugin_2db.erl +++ b/src/emq_plugin_2db.erl @@ -114,13 +114,28 @@ write(Odbc,NamePid,Que1,Que2) -> end. -try_get_ts(TimestampStr,MessageMaps) -> +try_get_ts(Message,MessageMaps) -> case maps:is_key(<<"ts">> ,MessageMaps) of true-> - binary_to_list(maps:get(<<"ts">> ,MessageMaps)); + TimeBin = maps:get(<<"ts">> ,MessageMaps), + case is_binary(TimeBin) of + true -> binary_to_list(TimeBin); + false -> lists:flatten(io_lib:format("~p", [TimeBin])); + undefined ->lists:flatten(io_lib:format("~p", [TimeBin])) + end; false-> - TimestampStr; + Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])), + Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])), + Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])), + Timetemp = string:concat(Timestamp1,Timestamp2), + TimestampStr = string:concat(Timetemp,Timestamp3), + TimestampStr; undefined -> + Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])), + Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])), + Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])), + Timetemp = string:concat(Timestamp1,Timestamp2), + TimestampStr = string:concat(Timetemp,Timestamp3), TimestampStr end. @@ -168,9 +183,7 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) -> MessageMaps = jsx:decode(MessageBin, [return_maps]), case lists:any(fun (Elem) -> lists:member(Elem, lists:nth(Index,ReqkeysList)) end, maps:keys(MessageMaps) ) of true-> - Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])), - Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])), - Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])), + UsernameBin = element(2, element(4, Message)), ClientBin = element(1,element(4, Message)), ClientBinRe = re:replace(ClientBin, "\\s+", "", [global,{return,binary}]), @@ -180,8 +193,7 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) -> - Timetemp = string:concat(Timestamp1,Timestamp2), - TimestampStr = string:concat(Timetemp,Timestamp3), + Que1 = lists:nth(Index,Que1s), @@ -189,7 +201,7 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) -> {ok, Tokens, _} = erl_scan:string(TQue2), {ok, Parsed} = erl_parse:parse_exprs(Tokens), - Bindings = [{'TimestampStr', TimestampStr}, {'MessageMaps', MessageMaps}], + Bindings = [{'Message', Message}, {'MessageMaps', MessageMaps}], {value, Que2, _} = erl_eval:exprs(Parsed, Bindings), % io:format("Que1.....~p~n ", [Que1]), % io:format("Que2.....~p~n ", [Que2]),