Skip to content

Commit

Permalink
feat: export offline messages metric
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvjain99 committed Oct 18, 2024
1 parent 8c7ead5 commit c165338
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 48 deletions.
103 changes: 67 additions & 36 deletions apps/vmq_server/src/vmq_message_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
read/2,
delete/1,
delete/2,
find/1
find/1,
nr_of_offline_messages/0
]).

-define(OFFLINE_MESSAGES, offline_messages).

start() ->
Ret = supervisor:start_link({local, ?MODULE}, ?MODULE, []),
load_redis_functions(),
Expand All @@ -27,7 +30,9 @@ load_redis_functions() ->

{ok, PopOfflineMessageScript} = file:read_file(LuaDir ++ "/pop_offline_message.lua"),
{ok, WriteOfflineMessageScript} = file:read_file(LuaDir ++ "/write_offline_message.lua"),
{ok, DeleteSubsOfflineMessagesScript} = file:read_file(LuaDir ++ "/delete_subs_offline_messages.lua"),
{ok, DeleteSubsOfflineMessagesScript} = file:read_file(
LuaDir ++ "/delete_subs_offline_messages.lua"
),

{ok, <<"pop_offline_message">>} = vmq_redis:query(
vmq_message_store_redis_client,
Expand All @@ -49,50 +54,68 @@ load_redis_functions() ->
).

write(SubscriberId, Msg) ->
% TODO: Handle the return value(errors & negative) to generate offline messages metrics
vmq_redis:query(
vmq_message_store_redis_client,
[
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?WRITE_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId),
term_to_binary(Msg)
],
?FCALL,
?WRITE_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId),
term_to_binary(Msg)
],
?FCALL,
?WRITE_OFFLINE_MESSAGE
).
?WRITE_OFFLINE_MESSAGE
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

read(_SubscriberId, _MsgRef) ->
{error, not_supported}.

delete(SubscriberId) ->
% TODO: Handle the return value(errors & negatives) to generate offline messages metrics
vmq_redis:query(
vmq_message_store_redis_client,
[
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?DELETE_SUBS_OFFLINE_MESSAGES,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?DELETE_SUBS_OFFLINE_MESSAGES,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?DELETE_SUBS_OFFLINE_MESSAGES
).
?DELETE_SUBS_OFFLINE_MESSAGES
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

delete(SubscriberId, _MsgRef) ->
% TODO: Handle the return value(errors & negatives) to generate offline messages metrics
vmq_redis:query(
vmq_message_store_redis_client,
[
case
vmq_redis:query(
vmq_message_store_redis_client,
[
?FCALL,
?POP_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?POP_OFFLINE_MESSAGE,
1,
term_to_binary(SubscriberId)
],
?FCALL,
?POP_OFFLINE_MESSAGE
).
?POP_OFFLINE_MESSAGE
)
of
{ok, OfflineMsgCount} ->
ets:insert(?OFFLINE_MESSAGES, {count, binary_to_integer(OfflineMsgCount)});
{error, _} ->
{error, not_supported}
end.

find(SubscriberId) ->
case
Expand All @@ -118,6 +141,12 @@ find(SubscriberId) ->
Res
end.

nr_of_offline_messages() ->
case ets:lookup(?OFFLINE_MESSAGES, count) of
[] -> 0;
[{count, Count}] -> Count
end.

%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
Expand All @@ -128,6 +157,8 @@ find(SubscriberId) ->
{atom(), {atom(), atom(), list()}, permanent, pos_integer(), worker, [atom()]}
]}}.
init([]) ->
ets:new(?OFFLINE_MESSAGES, [named_table, public, {write_concurrency, true}]),

StoreCfgs = application:get_env(vmq_server, message_store, [
{redis, [
{connect_options, "[{sentinel, [{endpoints, [{\"localhost\", 26379}]}]},{database,2}]"}
Expand Down
16 changes: 4 additions & 12 deletions apps/vmq_server/src/vmq_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2158,7 +2158,8 @@ misc_statistics() ->
{router_memory, SMemory},
{retain_messages, NrOfRetain},
{retain_memory, RMemory},
{queue_processes, fetch_external_metric(vmq_queue_sup_sup, nr_of_queues, 0)}
{queue_processes, fetch_external_metric(vmq_queue_sup_sup, nr_of_queues, 0)},
{offline_messages, fetch_external_metric(vmq_message_store, nr_of_offline_messages, 0)}
].

-spec misc_stats_def() -> [metric_def()].
Expand Down Expand Up @@ -2192,7 +2193,8 @@ misc_stats_def() ->
retain_memory,
<<"The number of bytes used for storing retained messages.">>
),
m(gauge, [], queue_processes, queue_processes, <<"The number of MQTT queue processes.">>)
m(gauge, [], queue_processes, queue_processes, <<"The number of MQTT queue processes.">>),
m(gauge, [], offline_messages, offline_messages, <<"The number of offline messages">>)
].

-spec system_statistics() -> [{metric_id(), any()}].
Expand Down Expand Up @@ -2720,18 +2722,9 @@ met2idx({?REDIS_STALE_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 271;
met2idx({?REDIS_STALE_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 272;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?ENQUEUE_MSG}) -> 273;
met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?POLL_MAIN_QUEUE}) -> 274;
met2idx({?REDIS_CMD, ?RPUSH, ?MSG_STORE_WRITE}) -> 285;
met2idx({?REDIS_CMD, ?DEL, ?MSG_STORE_DELETE}) -> 286;
met2idx({?REDIS_CMD, ?FIND, ?MSG_STORE_FIND}) -> 287;
met2idx({?REDIS_CMD_ERROR, ?RPUSH, ?MSG_STORE_WRITE}) -> 288;
met2idx({?REDIS_CMD_ERROR, ?DEL, ?MSG_STORE_DELETE}) -> 289;
met2idx({?REDIS_CMD_ERROR, ?FIND, ?MSG_STORE_FIND}) -> 290;
met2idx({?REDIS_CMD_MISS, ?RPUSH, ?MSG_STORE_WRITE}) -> 291;
met2idx({?REDIS_CMD_MISS, ?DEL, ?MSG_STORE_DELETE}) -> 292;
met2idx({?REDIS_CMD_MISS, ?FIND, ?MSG_STORE_FIND}) -> 293;
met2idx({?REDIS_CMD, ?LPOP, ?MSG_STORE_DELETE}) -> 294;
met2idx({?REDIS_CMD_ERROR, ?LPOP, ?MSG_STORE_DELETE}) -> 295;
met2idx({?REDIS_CMD_MISS, ?LPOP, ?MSG_STORE_DELETE}) -> 296;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?NON_PERSISTENCE}) -> 297;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?RETRY, ?NON_PERSISTENCE}) -> 298;
met2idx({?QOS1_SUBSCRIPTION_OPTS, ?NON_RETRY, ?PERSISTENCE}) -> 299;
Expand Down Expand Up @@ -2812,7 +2805,6 @@ met2idx({?UNAUTH_REDIS_CMD, ?FCALL, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 373;
met2idx({?REDIS_CMD, ?FUNCTION_LOAD, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 374;
met2idx({?REDIS_CMD_ERROR, ?FUNCTION_LOAD, ?DELETE_SUBS_OFFLINE_MESSAGES}) -> 375.


-ifdef(TEST).
clear_stored_rates() ->
gen_server:call(?MODULE, clear_rates).
Expand Down
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
xmerl,
vmq_server,
vernemq_dev,
debugger,
{cuttlefish, load},
{vmq_plumtree, load},
{plumtree, load},
Expand Down

0 comments on commit c165338

Please sign in to comment.