Skip to content

Commit

Permalink
added metric from current table
Browse files Browse the repository at this point in the history
  • Loading branch information
markus812498 committed Dec 1, 2023
1 parent cd42069 commit 8861a7a
Showing 1 changed file with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@
{4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
]},

{stream_consumer_metrics, [
{2, undefined, consumed, counter, "Total number of messages consumed on connection", consumed},
{2, undefined, offset, counter, "Total offset of consuming connection", offset},
{2, undefined, offset_lag, counter, "Total offset lag of consuming connection", offset_lag}
]},
{rabbit_stream_consumer_created, [
{2, undefined, stream_consumer_offset, counter, "Total nunber of offset", offset},
{2, undefined, stream_consumer_offset_lag, counter, "Total offset lag of connection", offset_lag},
{2, undefined, stream_consumer_consumed_total, counter, "Total number of messages consumed on connection", consumed}
]
},

{connection_metrics, [
{2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
Expand Down Expand Up @@ -420,6 +421,8 @@ label(M) when is_map(M) ->
end, <<>>, M);
label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
<<"vhost=\"", VHost/binary, "\",exchange=\"", Name/binary, "\"">>;
label({#resource{virtual_host = VHost, kind = queue, name = Name}, P, _}) when is_pid(P) ->
<<"vhost=\"", VHost/binary, "\",queue=\"", Name/binary, "\"","channel=\"",(iolist_to_binary(pid_to_list(P)))/binary, "\"">>;
label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
<<"vhost=\"", VHost/binary, "\",queue=\"", Name/binary, "\"">>;
label({P, {#resource{virtual_host = QVHost, kind = queue, name = QName},
Expand Down Expand Up @@ -503,15 +506,6 @@ get_data(connection_metrics = Table, false, _, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
get_data(stream_consumer_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(credits, Props), A1),
sum(proplists:get_value(consumed, Props), A2),
sum(proplists:get_value(offset, Props), A3),
sum(proplists:get_value(offset_lag, Props), A4)}
end, empty(Table), Table),
[{Table, [{credits, A1}, {consumed, A2}, {offset, A3}, {offset_lag, A4}]}];
get_data(channel_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
Expand All @@ -527,6 +521,17 @@ get_data(channel_metrics = Table, false, _, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
get_data(rabbit_stream_consumer_created = Table, false, _, _) ->
%% Table = rabbit_stream_consumer_created, %% real table name
{Table, A1, A2, A3} = ets:foldl(fun({{{_},_}, Props}, {T, A1, A2, A3}) ->
{T,
sum(proplists:get_value(offset, Props), A1),
sum(proplists:get_value(offset_lag, Props), A2),
sum(proplists:get_value(consumed, Props), A3)
}
end, empty(Table), Table),
[{Table, [{offset, A1}, {offset_lag, A2}, {consumed, A3}]}];

get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
Expand Down Expand Up @@ -709,7 +714,7 @@ accumulate_count_and_sum(Value, {Count, Sum}) ->

empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
empty(T) when T == connection_coarse_metrics; T == rabbit_stream_consumer_created; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
{T, 0, 0, 0, 0};
Expand Down

0 comments on commit 8861a7a

Please sign in to comment.