Skip to content

Commit

Permalink
""
Browse files Browse the repository at this point in the history
  • Loading branch information
arvindh123 committed Apr 4, 2019
1 parent 92c9bbe commit 62d0866
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 30 deletions.
12 changes: 6 additions & 6 deletions etc/emq_plugin_2db.config
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
{que2,
[

"[ {{sql_varchar, 80}, [TimestampStr]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")]}
"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp\">>, MessageMaps,\"int\")] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm\">>, MessageMaps,\"int\")] }
]\.",

"[ {{sql_varchar, 80}, [TimestampStr]},
"[ {{sql_varchar, 80}, [emq_plugin_2db:try_get_ts(TimestampStr, MessageMaps)] },
{sql_integer , [emq_plugin_2db:try_get_val(<<\"suid1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"temp1\">>, MessageMaps,\"int\")]},
{sql_integer , [emq_plugin_2db:try_get_val(<<\"ppm1\">>, MessageMaps,\"int\")]}
Expand Down
47 changes: 30 additions & 17 deletions src/emq_plugin_2db.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

-export([on_message_publish/6]).

-export([init/0, deinit/0, connect/1, disconnect/0, write/3, try_get_val/3 ]).
-export([init/0, deinit/0, connect/2, disconnect/1, write/4, try_get_val/3, try_get_ts/2 ]).

init()->

Expand All @@ -49,12 +49,12 @@ deinit()->
{ok, "Stopped odbc"}
end.

connect(Odbc) ->
case whereis(plugconndb1) of
connect(Odbc,NamePid) ->
case whereis(NamePid) of
undefined ->
case odbc:connect("DSN="++lists:nth(1,Odbc)++";UID="++lists:nth(2,Odbc)++";PWD="++lists:nth(3,Odbc), []) of
{ok, Pid}->
try register(plugconndb1, Pid)
try register(NamePid, Pid)
catch
error:X ->
io:fwrite(X)
Expand All @@ -69,7 +69,7 @@ connect(Odbc) ->
Value == 'odbc_not_started' ->
case emq_plugin_2db:init() of
{ok,Val} ->
emq_plugin_2db:connect(Odbc)
emq_plugin_2db:connect(Odbc,NamePid)
end;
true->
io:fwrite("~p ~n", [Value]),
Expand All @@ -82,37 +82,48 @@ connect(Odbc) ->
{ok, Pid}
end.

disconnect() ->
case whereis(plugconndb1) of
disconnect(NamePid) ->
case whereis(NamePid) of
undefined ->
{ok, {"there no ODBC plugconndb1"}};
{ok, {"there no ODBC "}};
Pid ->
odbc:disconnect(Pid),
{ok, {"disconnected -", Pid}}
end.

write(Odbc,Que1,Que2) ->
write(Odbc,NamePid,Que1,Que2) ->

case whereis(plugconndb1) of
case whereis(NamePid) of
undefined ->
case emq_plugin_2db:connect(Odbc) of
case emq_plugin_2db:connect(Odbc,NamePid) of
{ok,Reason} ->
write(Odbc,Que1,Que2);
write(Odbc,NamePid,Que1,Que2);
{error,Reason} ->
{error, Reason}
end;
Pid ->

case odbc:param_query(Pid,Que1,Que2) of
ResultTuple ->
ok;
% io:format("odbc:param_query Result of Writing to DB: ~p~n", [ResultTuple]);
{error,Reason} ->
io:format("odbc:param_query Error in Writing to DB: ~p~n", [Reason])
io:format("odbc:param_query Error in Writing to DB: ~p~n", [Reason]);
ResultTuple ->
ok
% io:format("odbc:param_query ResultTuple: ~p~n", [ResultTuple])

end

end.

try_get_ts(TimestampStr,MessageMaps) ->
case maps:is_key(<<"ts">> ,MessageMaps) of
true->
binary_to_list(maps:get(<<"ts">> ,MessageMaps));
false->
TimestampStr;
undefined ->
TimestampStr
end.

try_get_val(Key,MessageMaps,Dt) ->
case maps:is_key(Key ,MessageMaps) of
true->
Expand Down Expand Up @@ -162,6 +173,8 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) ->
Timestamp3 = lists:flatten(io_lib:format("~p", [element(3, element(13, Message))])),
UsernameBin = element(2, element(4, Message)),
ClientBin = element(1,element(4, Message)),
ClientBinRe = re:replace(ClientBin, "\\s+", "", [global,{return,binary}]),
NamePid = binary_to_atom(<<"pid_", ClientBinRe/binary>>, unicode),
ClientStr = binary_to_list(ClientBin),
UsernameStr = binary_to_list(UsernameBin),

Expand All @@ -181,7 +194,7 @@ on_message_publish(Message, Odbc, Topics,ReqkeysList,Que1s,Que2s) ->
% io:format("Que1.....~p~n ", [Que1]),
% io:format("Que2.....~p~n ", [Que2]),

write(Odbc,Que1,Que2),
write(Odbc,NamePid,Que1,Que2),
{ok, Message};
false->
{error, "no required Key in JSON"}
Expand Down
7 changes: 0 additions & 7 deletions src/emq_plugin_2db_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,14 @@

start(_StartType, _StartArgs) ->
{ok, Sup} = emq_plugin_2db_sup:start_link(),
ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
io:format("get_key env ~p~n", [application:get_key(env)]),
Env = element(2,application:get_key(env)),

Odbc = element(2,lists:keyfind(odbc,1,Env)),
Topics = element(2,lists:keyfind(topics,1,Env)),
ReqkeysList = element(2,lists:keyfind(reqlist,1,Env)),
Que1s = element(2,lists:keyfind(que1,1,Env)),
Que2s = element(2,lists:keyfind(que2,1,Env)),

emq_plugin_2db:load(Odbc,Topics,ReqkeysList,Que1s,Que2s),
{ok, Sup}.

stop(_State) ->
ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo),
ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo),
emq_plugin_2db:unload().

0 comments on commit 62d0866

Please sign in to comment.