Skip to content

Commit

Permalink
""
Browse files Browse the repository at this point in the history
  • Loading branch information
arvindh123 committed Apr 4, 2019
0 parents commit 5965cc3
Show file tree
Hide file tree
Showing 12 changed files with 624 additions and 0 deletions.
21 changes: 21 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
PROJECT = emq_plugin_2db
PROJECT_DESCRIPTION = EMQ Plugin 2 DB
PROJECT_VERSION = 2.3.11

BUILD_DEPS = emqttd cuttlefish
dep_emqttd = git https://github.com/emqtt/emqttd master
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11

ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'

NO_AUTOPATCH = cuttlefish

COVER = true

include erlang.mk

app:: rebar.config

app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_2db.conf -i priv/emq_plugin_2db.schema -d data
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
emq-plugin-2db
===================

This Plugin Helps to write to EMQ Published data to DB with ODBC
2 changes: 2 additions & 0 deletions emq_plugin_2db.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

COMPILE_FIRST +=
1 change: 1 addition & 0 deletions erlang.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../erlang.mk
47 changes: 47 additions & 0 deletions etc/emq_plugin_2db.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@

[
{emq_plugin_2db, [
{odbc, ["mysqlserver","test","test"] },
{topics ,
[
{<<"sgcd/solvent-live">>,1},
{<<"sgcd/solvent-status">>,2}
]
},

{reqlist,
[
[<<"ppm">>,<<"temp">>,<<"suid">>],
[<<"ppm1">>,<<"temp1">>,<<"suid1">>]
]

},

{que1 ,
[
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)",
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)"
]

},

{que2,
[

"[ {{sql_varchar, 80}, [TimestampStr]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")]}
]\.",

"[ {{sql_varchar, 80}, [TimestampStr]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]}
]\."

]
}
]}
].

40 changes: 40 additions & 0 deletions etc/emq_plugin_2dbV1.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

[
{emq_plugin_2db, [
{odbc, ["mysql","test","test"] },
{topics ,
[
<<"sgcd/solvent-live">>,
<<"sgcd/solvent-status">>
]
},

{que1 ,
[
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid] ,[ppm] ,[temp]) VALUES( ? ,? , ? , ?)",
"INSERT INTO [Marposs].[dbo].[sgcdSolventL1] ( [ts] ,[suid1] ,[ppm1] ,[temp1]) VALUES( ? ,? , ? , ?)"
]

},

{que2,
[

"[ {{sql_varchar, 80}, [TimestampStr]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")]}\.
]",

"[ {{sql_varchar, 80}, [TimestampStr]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]}\.
]"

]
}
]}
].


4 changes: 4 additions & 0 deletions rebar.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{deps, [

]}.
{erl_opts, [debug_info,{parse_transform,lager_transform}]}.
32 changes: 32 additions & 0 deletions src/emq_plugin_2b_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emq_plugin_2db_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1]).

start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
{ok, { {one_for_one, 0, 1}, []} }.

206 changes: 206 additions & 0 deletions src/emq_plugin_2db.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(emq_plugin_2db).

-include_lib("emqttd/include/emqttd.hrl").


-import(string,[len/1]).

-export([load/5, unload/0]).

%% Hooks functions


-export([on_message_publish/6]).

-export([init/0, deinit/0, connect/1, disconnect/0, write/3, try_get_val/3 ]).

init()->

case odbc:start() of
ok ->
io:fwrite("Started ODBC ~n"),
{ok, "Started odbc"};

{error,{already_started,odbc}} ->
io:fwrite("Already Connceted ~n"),
{error, "already started odbc"}
end.

deinit()->
case odbc:stop() of
ok ->
io:fwrite("Stoped ODBC ~n"),
{ok, "Stopped odbc"}
end.

connect(Odbc) ->
case whereis(plugconndb1) of
undefined ->
case odbc:connect("DSN="++lists:nth(1,Odbc)++";UID="++lists:nth(2,Odbc)++";PWD="++lists:nth(3,Odbc), []) of
{ok, Pid}->
try register(plugconndb1, Pid)
catch
error:X ->
io:fwrite(X)


end,
io:fwrite("Connected successfully ~n"),
{ok,{"Connected Pid- ", Pid}};
{error, Value} ->
io:fwrite("~p ~n", [Value]),
if
Value == 'odbc_not_started' ->
case emq_plugin_2db:init() of
{ok,Val} ->
emq_plugin_2db:connect(Odbc)
end;
true->
io:fwrite("~p ~n", [Value]),
{error,Value}
end

end;
Pid ->
io:fwrite("Already Connected with PID - ~p~n",[Pid]),
{ok, Pid}
end.

disconnect() ->
case whereis(plugconndb1) of
undefined ->
{ok, {"there no ODBC plugconndb1"}};
Pid ->
odbc:disconnect(Pid),
{ok, {"disconnected -", Pid}}
end.

write(Odbc,Que1,Que2) ->

case whereis(plugconndb1) of
undefined ->
case emq_plugin_2db:connect(Odbc) of
{ok,Reason} ->
write(Odbc,Que1,Que2);
{error,Reason} ->
{error, Reason}
end;
Pid ->

case odbc:param_query(Pid,Que1,Que2) of
ResultTuple ->
io:format("odbc:param_query Result of Writing to DB: ~p~n", [ResultTuple]);
{error,Reason} ->
io:format("odbc:param_query Error in Writing to DB: ~p~n", [Reason])
end

end.

try_get_val(Key,MessageMaps,Dt) ->
case maps:is_key(Key ,MessageMaps) of
true->
maps:get(Key,MessageMaps);
false->
if
Dt == "int" -> 0;
Dt == "string" -> "";
true -> ""
end;
undefined ->
if
Dt == "int" -> 0;
Dt == "string" -> "";
true -> ""
end
end.

%% Called when the plugin application start
load(Odbc,Topics,ReqkeysList,Que1s,Que2s) ->
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/6, [Odbc,Topics,ReqkeysList,Que1s,Que2s]).



on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Odbc, _Topics, _ReqkeysList, _Que1s, _Que2s) ->
{ok, Message};

on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),

TopicBin = element(5, Message),

ReqTopicsBinList = [<<"sgcd/solvent-live">>],

case lists:keyfind(TopicBin, 1,Topics) of
{Topic, Index} ->
TopicStr = binary_to_list(TopicBin),
MessageBin = element(12, Message),

case jsx:is_json(MessageBin) of
true ->
MessageMaps = jsx:decode(MessageBin, [return_maps]),
case lists:any(fun (Elem) -> lists:member(Elem, lists:nth(Index,ReqkeysList)) end, maps:keys(MessageMaps) ) of
true->
Timestamp1 = lists:flatten(io_lib:format("~p", [element(1, element(13, Message))])),
Timestamp2 = lists:flatten(io_lib:format("~p", [element(2, element(13, Message))])),
Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])),
UsernameBin = element(2, element(4, Message)),
ClientBin = element(1,element(4, Message)),
ClientStr = binary_to_list(ClientBin),
UsernameStr = binary_to_list(UsernameBin),



Timetemp = string:concat(Timestamp1,Timestamp2),
TimestampStr = string:concat(Timetemp,Timestamp3),


Que1 = lists:nth(Index,Que1s),
TQue2 = lists:nth(Index,Que2s),

{ok, Tokens, _} = erl_scan:string(TQue2),
{ok, Parsed} = erl_parse:parse_exprs(Tokens),
Bindings = [{'TimestampStr', TimestampStr}, {'MessageMaps', MessageMaps}],
{value, Que2, _} = erl_eval:exprs(Parsed, Bindings),
io:format("Que1.....~p~n ", [Que1]),
io:format("Que2.....~p~n ", [Que2]),

write(Odbc,Que1,Que2),
{ok, Message};
false->
{error, "no required Key in JSON"}
end;
false ->
{error, "not json string"};

undefined ->
{error, "dont'know, undefined"}
end;
false->
{error ,"Topic Not mactched"};
undefined ->
{error ,"Topic undefined"}
end.




%% Called when the plugin application stop
unload() ->
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2).

Loading

0 comments on commit 5965cc3

Please sign in to comment.