diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..907f52c --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +PROJECT = emq_plugin_2db +PROJECT_DESCRIPTION = EMQ Plugin 2 DB +PROJECT_VERSION = 2.3.11 + +BUILD_DEPS = emqttd cuttlefish +dep_emqttd = git https://github.com/emqtt/emqttd master +dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11 + +ERLC_OPTS += +debug_info +ERLC_OPTS += +'{parse_transform, lager_transform}' + +NO_AUTOPATCH = cuttlefish + +COVER = true + +include erlang.mk + +app:: rebar.config + +app.config:: + ./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_2db.conf -i priv/emq_plugin_2db.schema -d data diff --git a/README.md b/README.md new file mode 100644 index 0000000..7b4b720 --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +emq-plugin-2db +=================== + +This Plugin Helps to write to EMQ Published data to DB with ODBC \ No newline at end of file diff --git a/emq_plugin_2db.d b/emq_plugin_2db.d new file mode 100644 index 0000000..004e985 --- /dev/null +++ b/emq_plugin_2db.d @@ -0,0 +1,2 @@ + +COMPILE_FIRST += diff --git a/erlang.mk b/erlang.mk new file mode 100644 index 0000000..8930dfc --- /dev/null +++ b/erlang.mk @@ -0,0 +1 @@ +include ../../erlang.mk diff --git a/etc/emq_plugin_2db.config b/etc/emq_plugin_2db.config new file mode 100644 index 0000000..5426504 --- /dev/null +++ b/etc/emq_plugin_2db.config @@ -0,0 +1,47 @@ + +[ + {emq_plugin_2db, [ + {odbc, ["mysqlserver","test","test"] }, + {topics , + [ + {<<"sgcd/solvent-live">>,1}, + {<<"sgcd/solvent-status">>,2} + ] + }, + + {reqlist, + [ + [<<"ppm">>,<<"temp">>,<<"suid">>], + [<<"ppm1">>,<<"temp1">>,<<"suid1">>] + ] + + }, + + {que1 , + [ + "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)", + "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)" + ] + + }, + + {que2, + [ + + "[ {{sql_varchar, 80}, [TimestampStr]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")]} + ]\.", + + "[ {{sql_varchar, 80}, [TimestampStr]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]} + ]\." + + ] + } + ]} +]. + diff --git a/etc/emq_plugin_2dbV1.txt b/etc/emq_plugin_2dbV1.txt new file mode 100644 index 0000000..6ecd30d --- /dev/null +++ b/etc/emq_plugin_2dbV1.txt @@ -0,0 +1,40 @@ + +[ + {emq_plugin_2db, [ + {odbc, ["mysql","test","test"] }, + {topics , + [ + <<"sgcd/solvent-live">>, + <<"sgcd/solvent-status">> + ] + }, + + {que1 , + [ + "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)", + "INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid1] ,[ppm1] ,[temp1]) VALUES( ? ,? , ? , ?)" + ] + + }, + + {que2, + [ + + "[ {{sql_varchar, 80}, [TimestampStr]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")]}\. + ]", + + "[ {{sql_varchar, 80}, [TimestampStr]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]}\. + ]" + + ] + } + ]} +]. + + diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..9160d8f --- /dev/null +++ b/rebar.config @@ -0,0 +1,4 @@ +{deps, [ + +]}. +{erl_opts, [debug_info,{parse_transform,lager_transform}]}. diff --git a/src/emq_plugin_2b_sup.erl b/src/emq_plugin_2b_sup.erl new file mode 100644 index 0000000..78fb8fb --- /dev/null +++ b/src/emq_plugin_2b_sup.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emq_plugin_2db_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, { {one_for_one, 0, 1}, []} }. + diff --git a/src/emq_plugin_2db.erl b/src/emq_plugin_2db.erl new file mode 100644 index 0000000..0d66c95 --- /dev/null +++ b/src/emq_plugin_2db.erl @@ -0,0 +1,206 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emq_plugin_2db). + +-include_lib("emqttd/include/emqttd.hrl"). + + +-import(string,[len/1]). + +-export([load/5, unload/0]). + +%% Hooks functions + + +-export([on_message_publish/6]). + +-export([init/0, deinit/0, connect/1, disconnect/0, write/3, try_get_val/3 ]). + +init()-> + + case odbc:start() of + ok -> + io:fwrite("Started ODBC ~n"), + {ok, "Started odbc"}; + + {error,{already_started,odbc}} -> + io:fwrite("Already Connceted ~n"), + {error, "already started odbc"} + end. + +deinit()-> + case odbc:stop() of + ok -> + io:fwrite("Stoped ODBC ~n"), + {ok, "Stopped odbc"} + end. + +connect(Odbc) -> + case whereis(plugconndb1) of + undefined -> + case odbc:connect("DSN="++lists:nth(1,Odbc)++";UID="++lists:nth(2,Odbc)++";PWD="++lists:nth(3,Odbc), []) of + {ok, Pid}-> + try register(plugconndb1, Pid) + catch + error:X -> + io:fwrite(X) + + + end, + io:fwrite("Connected successfully ~n"), + {ok,{"Connected Pid- ", Pid}}; + {error, Value} -> + io:fwrite("~p ~n", [Value]), + if + Value == 'odbc_not_started' -> + case emq_plugin_2db:init() of + {ok,Val} -> + emq_plugin_2db:connect(Odbc) + end; + true-> + io:fwrite("~p ~n", [Value]), + {error,Value} + end + + end; + Pid -> + io:fwrite("Already Connected with PID - ~p~n",[Pid]), + {ok, Pid} + end. + +disconnect() -> + case whereis(plugconndb1) of + undefined -> + {ok, {"there no ODBC plugconndb1"}}; + Pid -> + odbc:disconnect(Pid), + {ok, {"disconnected -", Pid}} + end. + +write(Odbc,Que1,Que2) -> + + case whereis(plugconndb1) of + undefined -> + case emq_plugin_2db:connect(Odbc) of + {ok,Reason} -> + write(Odbc,Que1,Que2); + {error,Reason} -> + {error, Reason} + end; + Pid -> + + case odbc:param_query(Pid,Que1,Que2) of + ResultTuple -> + io:format("odbc:param_query Result of Writing to DB: ~p~n", [ResultTuple]); + {error,Reason} -> + io:format("odbc:param_query Error in Writing to DB: ~p~n", [Reason]) + end + + end. + +try_get_val(Key,MessageMaps,Dt) -> + case maps:is_key(Key ,MessageMaps) of + true-> + maps:get(Key,MessageMaps); + false-> + if + Dt == "int" -> 0; + Dt == "string" -> ""; + true -> "" + end; + undefined -> + if + Dt == "int" -> 0; + Dt == "string" -> ""; + true -> "" + end + end. + +%% Called when the plugin application start +load(Odbc,Topics,ReqkeysList,Que1s,Que2s) -> + emqttd:hook('message.publish', fun ?MODULE:on_message_publish/6, [Odbc,Topics,ReqkeysList,Que1s,Que2s]). + + + +on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Odbc, _Topics, _ReqkeysList, _Que1s, _Que2s) -> + {ok, Message}; + +on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) -> + io:format("publish ~s~n", [emqttd_message:format(Message)]), + + TopicBin = element(5, Message), + + ReqTopicsBinList = [<<"sgcd/solvent-live">>], + + case lists:keyfind(TopicBin, 1,Topics) of + {Topic, Index} -> + TopicStr = binary_to_list(TopicBin), + MessageBin = element(12, Message), + + case jsx:is_json(MessageBin) of + true -> + MessageMaps = jsx:decode(MessageBin, [return_maps]), + case lists:any(fun (Elem) -> lists:member(Elem, lists:nth(Index,ReqkeysList)) end, maps:keys(MessageMaps) ) of + true-> + Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])), + Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])), + Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])), + UsernameBin = element(2, element(4, Message)), + ClientBin = element(1,element(4, Message)), + ClientStr = binary_to_list(ClientBin), + UsernameStr = binary_to_list(UsernameBin), + + + + Timetemp = string:concat(Timestamp1,Timestamp2), + TimestampStr = string:concat(Timetemp,Timestamp3), + + + Que1 = lists:nth(Index,Que1s), + TQue2 = lists:nth(Index,Que2s), + + {ok, Tokens, _} = erl_scan:string(TQue2), + {ok, Parsed} = erl_parse:parse_exprs(Tokens), + Bindings = [{'TimestampStr', TimestampStr}, {'MessageMaps', MessageMaps}], + {value, Que2, _} = erl_eval:exprs(Parsed, Bindings), + io:format("Que1.....~p~n ", [Que1]), + io:format("Que2.....~p~n ", [Que2]), + + write(Odbc,Que1,Que2), + {ok, Message}; + false-> + {error, "no required Key in JSON"} + end; + false -> + {error, "not json string"}; + + undefined -> + {error, "dont'know, undefined"} + end; + false-> + {error ,"Topic Not mactched"}; + undefined -> + {error ,"Topic undefined"} + end. + + + + +%% Called when the plugin application stop +unload() -> + emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2). + diff --git a/src/emq_plugin_2db1erl b/src/emq_plugin_2db1erl new file mode 100644 index 0000000..cddcd07 --- /dev/null +++ b/src/emq_plugin_2db1erl @@ -0,0 +1,207 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emq_plugin_2db). + +-include_lib("emqttd/include/emqttd.hrl"). + + +-import(string,[len/1]). + +-export([load/2, unload/0]). + +%% Hooks functions + + +-export([on_message_publish/2]). + +-export([init/0, deinit/0, connect/0, disconnect/0, write/2, try_get_val/3 ]). + +init()-> + + case odbc:start() of + ok -> + io:fwrite("Started ODBC ~n"), + {ok, "Started odbc"}; + + {error,{already_started,odbc}} -> + io:fwrite("Already Connceted ~n"), + {error, "already started odbc"} + end. + +deinit()-> + case odbc:stop() of + ok -> + io:fwrite("Stoped ODBC ~n"), + {ok, "Stopped odbc"} + end. + +connect() -> + case whereis(plugconndb1) of + undefined -> + case odbc:connect("DSN=SGMainSQL;UID=CRMS;PWD=dev@web31", []) of + {ok, Pid}-> + try register(plugconndb1, Pid) + catch + error:X -> + io:fwrite(X) + + + end, + io:fwrite("Connected successfully ~n"), + {ok,{"Connected Pid- ", Pid}}; + {error, Value} -> + io:fwrite("~p ~n", [Value]), + if + Value == 'odbc_not_started' -> + case emq_plugin_2db:init() of + {ok,Val} -> + emq_plugin_2db:connect() + end; + true-> + io:fwrite("~p ~n", [Value]), + {error,Value} + end + + end; + Pid -> + io:fwrite("Already Connected with PID - ~p~n",[Pid]), + {ok, Pid} + end. + +disconnect() -> + case whereis(plugconndb1) of + undefined -> + {ok, {"there no ODBC plugconndb1"}}; + Pid -> + odbc:disconnect(Pid), + {ok, {"disconnected -", Pid}} + end. + +write(Que1,Que2) -> + + case whereis(plugconndb1) of + undefined -> + case emq_plugin_2db:connect() of + {ok,Reason} -> + write(Que1,Que2); + {error,Reason} -> + {error, Reason} + end; + Pid -> + + case odbc:param_query(Pid,Que1,Que2) of + ResultTuple -> + io:format("odbc:param_query Result of Writing to DB: ~p~n", [ResultTuple]); + {error,Reason} -> + io:format("odbc:param_query Error in Writing to DB: ~p~n", [Reason]) + end + + end. + +try_get_val(Key,MessageMaps,Dt) -> + case maps:is_key(Key ,MessageMaps) of + true-> + maps:get(Key,MessageMaps); + false-> + if + Dt == "int" -> 0; + Dt == "string" -> ""; + true -> "" + end; + undefined -> + if + Dt == "int" -> 0; + Dt == "string" -> ""; + true -> "" + end + end. + +%% Called when the plugin application start +load(EnvMap) -> + emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [EnvMap]). + + + +on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _EnvMap) -> + {ok, Message}; + +on_message_publish(Message, EnvMap) -> + io:format("EnvMap in Publish ~p~n", [EnvMap]), + io:format("publish ~s~n", [emqttd_message:format(Message)]), + % io:format("Raw.... Msg.....~p~n ", [Message]), + + TopicBin = element(5, Message), + + ReqTopicsBinList = [<<"sgcd/solvent-live">>], + case lists:member(TopicBin, maps:get(topics, EnvMap)) of + true -> + TopicStr = binary_to_list(TopicBin), + MessageBin = element(12, Message), + ReqkeysList = [<<"ppm">>,<<"temp">>,<<"suid">>], + case jsx:is_json(MessageBin) of + true -> + MessageMaps = jsx:decode(MessageBin, [return_maps]), + case lists:any(fun (Elem) -> lists:member(Elem, ReqkeysList) end, maps:keys(MessageMaps) ) of + true-> + Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])), + Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])), + Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])), + UsernameBin = element(2, element(4, Message)), + ClientBin = element(1,element(4, Message)), + ClientStr = binary_to_list(ClientBin), + UsernameStr = binary_to_list(UsernameBin), + + + + Timetemp = string:concat(Timestamp1,Timestamp2), + TimestampStr = string:concat(Timetemp,Timestamp3), + + + Que1 = maps:get(que1, EnvMap), + Que2 = [ {{sql_varchar, 80}, [TimestampStr]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<"suid">>, MessageMaps,"int")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<"ppm">>, MessageMaps,"int")]}, + {sql_integer , [emq_plugin_2db:try_get_val(<<"temp">>, MessageMaps,"int")]} + ], + + io:format("Que1.....~p~n ", [Que1]), + io:format("Que2.....~p~n ", [Que2]), + + write(Que1,Que2), + {ok, Message}; + false-> + {error, "no required Key in JSON"} + end; + false -> + {error, "not json string"}; + + undefined -> + {error, "dont'know, undefined"} + end; + false-> + {error ,"Topic Not mactched"}; + undefined -> + {error ,"Topic undefined"} + end. + + + + +%% Called when the plugin application stop +unload() -> + emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2). + diff --git a/src/emq_plugin_2db_app.erl b/src/emq_plugin_2db_app.erl new file mode 100644 index 0000000..d48aee2 --- /dev/null +++ b/src/emq_plugin_2db_app.erl @@ -0,0 +1,52 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emq_plugin_2db_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +-define(APP, emq_plugin_2db). + +start(_StartType, _StartArgs) -> + {ok, Sup} = emq_plugin_2db_sup:start_link(), + ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []), + ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []), + io:format("get_key env ~p~n", [application:get_key(env)]), + Env = element(2,application:get_key(env)), + + Odbc = element(2,lists:keyfind(odbc,1,Env)), + Topics = element(2,lists:keyfind(topics,1,Env)), + ReqkeysList = element(2,lists:keyfind(reqlist,1,Env)), + Que1s = element(2,lists:keyfind(que1,1,Env)), + Que2s = element(2,lists:keyfind(que2,1,Env)), + + io:format("Odbc ~p~n", [Odbc]), + io:format("Topics ~p~n", [Topics]), + io:format("ReqkeysList env ~p~n", [ReqkeysList]), + io:format("Que1s env ~p~n", [Que1s]), + io:format("Que2s env ~p~n", [Que2s]), + + % EnvMap = maps:from_list(Env), + emq_plugin_2db:load(Odbc,Topics,ReqkeysList,Que1s,Que2s), + {ok, Sup}. + +stop(_State) -> + ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo), + ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo), + emq_plugin_2db:unload(). diff --git a/test/emq_plugin_2db_SUITE.erl b/test/emq_plugin_2db_SUITE.erl new file mode 100644 index 0000000..b4361a0 --- /dev/null +++ b/test/emq_plugin_2db_SUITE.erl @@ -0,0 +1,8 @@ + +-module(emq_plugin_2db_SUITE). + +-compile(export_all). + +all() -> []. + +groups() -> [].