From a77a26997f200637825e6a5fb36c1013e807050a Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Wed, 10 May 2017 21:53:56 +0300 Subject: [PATCH 1/9] Renamed monitoring to clusterstatus module --- lib/annon_api/management_api/router.ex | 3 ++- .../monitoring/{monitoring.ex => cluster_status.ex} | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) rename lib/annon_api/monitoring/{monitoring.ex => cluster_status.ex} (89%) diff --git a/lib/annon_api/management_api/router.ex b/lib/annon_api/management_api/router.ex index 8fef813d7..24e83db4b 100644 --- a/lib/annon_api/management_api/router.ex +++ b/lib/annon_api/management_api/router.ex @@ -6,6 +6,7 @@ defmodule Annon.ManagementAPI.Router do use Plug.ErrorHandler alias Annon.Helpers.Response alias Annon.ManagementAPI.Render + alias Annon.Monitoring.ClusterStatus if Confex.get(:annon_api, :sql_sandbox) do plug Phoenix.Ecto.SQL.Sandbox @@ -36,7 +37,7 @@ defmodule Annon.ManagementAPI.Router do end get "/cluster_status" do - status = Annon.Monitoring.get_status() + status = ClusterStatus.get_cluster_status() Render.render_one({:ok, status}, conn) end diff --git a/lib/annon_api/monitoring/monitoring.ex b/lib/annon_api/monitoring/cluster_status.ex similarity index 89% rename from lib/annon_api/monitoring/monitoring.ex rename to lib/annon_api/monitoring/cluster_status.ex index daeb9fae2..98e63c5f9 100644 --- a/lib/annon_api/monitoring/monitoring.ex +++ b/lib/annon_api/monitoring/cluster_status.ex @@ -1,9 +1,9 @@ -defmodule Annon.Monitoring do +defmodule Annon.Monitoring.ClusterStatus do @moduledoc """ - Monitoring service for Annons clusters. + This module provides functions to collects status from all nodes in Annon cluster. """ - def get_status do + def get_cluster_status do cluster_nodes = :erlang.nodes() cluster_strategy = get_cluster_strategy() nodes_status = From 6cc8b812febaf5b3633a00837f4173ef10f4363c Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 00:16:02 +0300 Subject: [PATCH 2/9] Reimplemented stats collection --- config/config.exs | 14 ++- config/prod.exs | 5 - config/test.exs | 5 +- lib/annon_api/application.ex | 5 + lib/annon_api/monitoring/metrics_collector.ex | 106 ++++++++++++++++++ .../monitoring/metrics_collector/packet.ex | 50 +++++++++ lib/annon_api/plugins/monitoring.ex | 11 +- mix.exs | 3 +- mix.lock | 4 +- test/unit/plugins/monitoring_test.exs | 56 ++++++--- 10 files changed, 218 insertions(+), 41 deletions(-) create mode 100644 lib/annon_api/monitoring/metrics_collector.ex create mode 100644 lib/annon_api/monitoring/metrics_collector/packet.ex diff --git a/config/config.exs b/config/config.exs index 183bc855a..445a03a2a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -27,12 +27,6 @@ config :annon_api, :configuration_cache, adapter: {:system, :module, "CONFIGURATION_CACHE_ADAPTER", Annon.Configuration.CacheAdapters.ETS}, cache_space: :configuration -# TODO: Replace with statix -config :ex_statsd, - host: "localhost", - port: 8125, - namespace: "annon" - # Configure Elixir logger config :logger, level: :debug @@ -63,5 +57,13 @@ config :skycluster, config :annon_api, sql_sandbox: {:system, :boolean, "SQL_SANDBOX", false} +config :annon_api, :metrics_collector, + enabled?: {:system, :boolean, "METRICS_COLLECTOR_ENABLED", true}, + send_tags: {:system, :boolean, "METRICS_COLLECTOR_SEND_TAGS", true}, + host: {:system, :string, "METRICS_COLLECTOR_HOST", "localhost"}, + port: {:system, :number, "METRICS_COLLECTOR_PORT", 32768}, + namespace: {:system, :string, "METRICS_COLLECTOR_HOST", "annon"}, + sample_rate: {:system, :float, "METRICS_COLLECTOR_SAMPLE_RATE", 0.25} + import_config "plugins.exs" import_config "#{Mix.env}.exs" diff --git a/config/prod.exs b/config/prod.exs index dd6cb9e6b..568481132 100644 --- a/config/prod.exs +++ b/config/prod.exs @@ -1,10 +1,5 @@ use Mix.Config -config :ex_statsd, - host: "${STATSD_HOST}", - port: 8125, - namespace: "gateway" - config :skycluster, strategy: {:system, :module, "SKYCLUSTER_STRATEGY", Cluster.Strategy.Kubernetes}, kubernetes_selector: {:system, "SKYCLUSTER_KUBERNETES_SELECTOR", "app=annon,component=api"}, diff --git a/config/test.exs b/config/test.exs index 146fabba8..45d78423a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -8,10 +8,9 @@ config :annon_api, Annon.Requests.Repo, database: System.get_env("MIX_LOGGER_TEST_DATABASE") || "annon_api_logger_test", pool: Ecto.Adapters.SQL.Sandbox -config :ex_statsd, +config :annon_api, :metrics_collector, sink: [], - namespace: "test", - test_mode: true + namespace: "test" config :annon_api, :acceptance, management: [ diff --git a/lib/annon_api/application.ex b/lib/annon_api/application.ex index a0db3101c..df97cbe25 100644 --- a/lib/annon_api/application.ex +++ b/lib/annon_api/application.ex @@ -15,6 +15,7 @@ defmodule Annon do children = [ supervisor(Annon.Configuration.Repo, []), supervisor(Annon.Requests.Repo, []), + worker(Annon.Monitoring.MetricsCollector, [metrics_collector_opts()]), worker(Annon.Configuration.Matcher, [matcher_opts()]), worker(Annon.AutoClustering, []), management_endpoint_spec(), @@ -34,6 +35,10 @@ defmodule Annon do Application.get_env(:annon_api, :configuration_cache) end + defp metrics_collector_opts do + Confex.get_map(:annon_api, :metrics_collector) + end + # Loads configuration in `:on_init` callbacks and replaces `{:system, ..}` tuples via Confex @doc false def load_from_system_env(config) do diff --git a/lib/annon_api/monitoring/metrics_collector.ex b/lib/annon_api/monitoring/metrics_collector.ex new file mode 100644 index 000000000..522566d54 --- /dev/null +++ b/lib/annon_api/monitoring/metrics_collector.ex @@ -0,0 +1,106 @@ +defmodule Annon.Monitoring.MetricsCollector do + @moduledoc """ + This module provides helper functions to persist meaningful metrics to StatsD or DogstatsD servers. + + Code is based on [Statix](https://github.com/lexmag/statix) library. + """ + use GenServer + alias AnnonMonitoring.MetricsCollector.Packet + + defstruct [:sock, :header, :config] + + @doc """ + Starts a metric collector process. + + `conn_opts` accepts connection arg + """ + def start_link(conn_opts) do + GenServer.start_link(__MODULE__, conn_opts, name: __MODULE__) + end + + @doc false + def init(conn_opts) do + enabled? = Keyword.get(conn_opts, :enabled?, true) + host = conn_opts |> Keyword.get(:host, "127.0.0.1") |> String.to_char_list() + port = Keyword.get(conn_opts, :port, 8125) + sink = Keyword.get(conn_opts, :sink, nil) + namespace = Keyword.get(conn_opts, :namespace, nil) + send_tags? = Keyword.get(conn_opts, :send_tags?, true) + + {:ok, address} = :inet.getaddr(host, :inet) + header = Packet.header(address, port) + + {:ok, socket} = :gen_udp.open(0, [active: false]) + + {:ok, %{ + enabled?: enabled?, + send_tags?: send_tags?, + header: [header | "#{namespace}."], + socket: socket, + sink: sink + }} + end + + def increment(key, val \\ 1, options \\ []) when is_number(val) do + transmit(:counter, key, val, options) + end + + def decrement(key, val \\ 1, options \\ []) when is_number(val) do + transmit(:counter, key, [?-, to_string(val)], options) + end + + def gauge(key, val, options \\ [] ) do + transmit(:gauge, key, val, options) + end + + def histogram(key, val, options \\ []) do + transmit(:histogram, key, val, options) + end + + def timing(key, val, options \\ []) do + transmit(:timing, key, val, options) + end + + @doc false + def transmit(type, key, val, options) when (is_binary(key) or is_list(key)) and is_list(options) do + sample_rate = Keyword.get(options, :sample_rate) + + if is_nil(sample_rate) or sample_rate >= :rand.uniform() do + GenServer.cast(__MODULE__, {:transmit, type, key, to_string(val), options}) + end + + :ok + end + + @doc false + def handle_cast({:transmit, _type, _key, _value, _options}, %{enabled?: false} = state), + do: {:noreply, state} + + # Transmits message to a sink + @doc false + def handle_cast({:transmit, type, key, value, options}, %{sink: sink} = state) when is_list(sink) do + %{header: header} = state + packet = %{type: type, key: key, value: value, options: options, header: header} + {:noreply, %{state | sink: [packet | sink]}} + end + + # Transmits message to a StatsD server + @doc false + def handle_cast({:transmit, type, key, value, options}, state) do + %{header: header, socket: socket, send_tags?: send_tags?} = state + + packet = Packet.build(header, type, key, value, send_tags?, options) + Port.command(socket, packet) + + receive do + {:inet_reply, _port, status} -> status + end + + {:noreply, state} + end + + @doc false + def handle_call(:flush, _from, state) do + {:reply, :ok, state} + end +end diff --git a/lib/annon_api/monitoring/metrics_collector/packet.ex b/lib/annon_api/monitoring/metrics_collector/packet.ex new file mode 100644 index 000000000..eeec267c3 --- /dev/null +++ b/lib/annon_api/monitoring/metrics_collector/packet.ex @@ -0,0 +1,50 @@ +defmodule AnnonMonitoring.MetricsCollector.Packet do + @moduledoc false + use Bitwise + + otp_release = :erlang.system_info(:otp_release) + @addr_family if(otp_release >= '19', do: [1], else: []) + + def header({n1, n2, n3, n4}, port) do + @addr_family ++ [ + band(bsr(port, 8), 0xFF), + band(port, 0xFF), + band(n1, 0xFF), + band(n2, 0xFF), + band(n3, 0xFF), + band(n4, 0xFF) + ] + end + + def build(header, name, key, val, send_tags?, options) do + [header, key, ?:, val, ?|, metric_type(name)] + |> set_option(:sample_rate, options[:sample_rate]) + |> set_option(:tags, options[:tags], send_tags?) + end + + metrics = %{ + counter: "c", + gauge: "g", + histogram: "h", + timing: "ms", + set: "s" + } + + for {name, type} <- metrics do + defp metric_type(unquote(name)), do: unquote(type) + end + + defp set_option(packet, _kind, _sample_rate, _send_tags? \\ nil) + + defp set_option(packet, _kind, nil, _send_tags?), + do: packet + + defp set_option(packet, :sample_rate, sample_rate, _send_tags?) when is_float(sample_rate), + do: [packet | ["|@", :erlang.float_to_binary(sample_rate, [:compact, decimals: 2])]] + + defp set_option(packet, :tags, tags, false) when is_list(tags), + do: packet + + defp set_option(packet, :tags, tags, true) when is_list(tags), + do: [packet | ["|#", Enum.join(tags, ",")]] +end diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index 487fd46c3..4679181e5 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -8,6 +8,7 @@ defmodule Annon.Plugins.Monitoring do """ use Annon.Plugin, plugin_name: :monitoring alias Plug.Conn + alias Annon.Monitoring.MetricsCollector def validate_settings(changeset), do: changeset @@ -18,11 +19,9 @@ defmodule Annon.Plugins.Monitoring do def execute(%Conn{} = conn, %{api: api, start_time: request_start_time}, _settings) do api_tags = tags(conn, api) - conn - |> get_request_size() - |> ExStatsD.histogram("request_size", tags: api_tags) - ExStatsD.increment("request_count", tags: api_tags) + MetricsCollector.histogram("request_size", get_request_size(conn), tags: api_tags) + MetricsCollector.increment("request_count", 1, tags: api_tags) conn |> Conn.register_before_send(&write_metrics(&1, api)) @@ -41,8 +40,8 @@ defmodule Annon.Plugins.Monitoring do defp write_metrics(%Conn{} = conn, api) do api_tags = tags(conn, api) ++ ["http_status:#{to_string conn.status}"] - ExStatsD.timer(conn.assigns.latencies_client, "latency", tags: api_tags) - ExStatsD.increment("response_count", tags: api_tags) + MetricsCollector.timing("latency", conn.assigns.latencies_client, tags: api_tags) + MetricsCollector.increment("response_count", 1, tags: api_tags) conn end diff --git a/mix.exs b/mix.exs index 225774073..48a7973b1 100644 --- a/mix.exs +++ b/mix.exs @@ -26,7 +26,7 @@ defmodule Annon.Mixfile do def application do [ extra_applications: [:logger, :logger_json, :confex, :cowboy, :plug, :postgrex, :ecto, :joken, - :nex_json_schema, :poison, :httpoison, :ex_statsd, :skycluster, :eview, + :nex_json_schema, :poison, :httpoison, :skycluster, :eview, :ecto_paging, :runtime_tools], mod: {Annon, []} ] @@ -61,7 +61,6 @@ defmodule Annon.Mixfile do {:joken, "~> 1.4"}, {:nex_json_schema, "~> 0.6.0"}, {:httpoison, ">= 0.0.0"}, - {:ex_statsd, ">= 0.5.1"}, {:eview, ">= 0.0.0"}, {:ecto_paging, ">= 0.0.0"}, {:phoenix_ecto, ">= 0.0.0"}, diff --git a/mix.lock b/mix.lock index c73a0ab67..4f7073aa0 100644 --- a/mix.lock +++ b/mix.lock @@ -2,7 +2,7 @@ "benchfella": {:hex, :benchfella, "0.3.4", "41d2c017b361ece5225b5ba2e3b30ae53578c57c6ebc434417b4f1c2c94cf4f3", [:mix], []}, "bunt": {:hex, :bunt, "0.2.0", "951c6e801e8b1d2cbe58ebbd3e616a869061ddadcc4863d0a2182541acae9a38", [:mix], []}, "certifi": {:hex, :certifi, "1.1.0", "c9b71a547016c2528a590ccfc28de786c7edb74aafa17446b84f54e04efc00ee", [:rebar3], []}, - "confex": {:hex, :confex, "2.0.0", "dcb83f6c00029444b799efa430692bcb2d72cefb3428523f5e64f56d2e56a314", [:mix], []}, + "confex": {:hex, :confex, "2.0.1", "dab9be500ef618d2e9f4bd49c6907958f7c3a9915f3e27585ffc4b008517c6c7", [:mix], []}, "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], []}, "cors_plug": {:hex, :cors_plug, "1.2.1", "bbe1381a52e4a16e609cf3c4cbfde6884726a58b9a1a205db104dbdfc542f447", [:mix], [{:plug, "> 0.8.0", [hex: :plug, optional: false]}]}, "cowboy": {:hex, :cowboy, "1.1.2", "61ac29ea970389a88eca5a65601460162d370a70018afe6f949a29dca91f3bb0", [:rebar3], [{:cowlib, "~> 1.0.2", [hex: :cowlib, optional: false]}, {:ranch, "~> 1.3.2", [hex: :ranch, optional: false]}]}, @@ -18,7 +18,6 @@ "eview": {:hex, :eview, "0.10.8", "c0e858eb1bb34889b06517912894f2e6b1be71324790f7ca68e7d6fc3e8516a7", [:mix], [{:credit_card, "~> 1.0", [hex: :credit_card, optional: true]}, {:ecto, "~> 2.1", [hex: :ecto, optional: true]}, {:nex_json_schema, "~> 0.6.0", [hex: :nex_json_schema, optional: true]}, {:plug, "~> 1.3", [hex: :plug, optional: false]}, {:poison, "~> 3.1", [hex: :poison, optional: false]}]}, "ex_doc": {:hex, :ex_doc, "0.15.1", "d5f9d588fd802152516fccfdb96d6073753f77314fcfee892b15b6724ca0d596", [:mix], [{:earmark, "~> 1.1", [hex: :earmark, optional: false]}]}, "ex_machina": {:hex, :ex_machina, "2.0.0", "ec284c6f57233729cea9319e083f66e613e82549f78eccdb2059aeba5d0df9f3", [:mix], [{:ecto, "~> 2.1", [hex: :ecto, optional: true]}]}, - "ex_statsd": {:hex, :ex_statsd, "0.5.3", "e86dd97e25dbc80786e7d22b3c5537f2052a7e12daaaa7e6f2b9c34d03dbbd44", [:mix], []}, "excoveralls": {:hex, :excoveralls, "0.6.3", "894bf9254890a4aac1d1165da08145a72700ff42d8cb6ce8195a584cb2a4b374", [:mix], [{:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]}, "exjsx": {:hex, :exjsx, "3.2.1", "1bc5bf1e4fd249104178f0885030bcd75a4526f4d2a1e976f4b428d347614f0f", [:mix], [{:jsx, "~> 2.8.0", [hex: :jsx, optional: false]}]}, "hackney": {:hex, :hackney, "1.8.0", "8388a22f4e7eb04d171f2cf0285b217410f266d6c13a4c397a6c22ab823a486c", [:rebar3], [{:certifi, "1.1.0", [hex: :certifi, optional: false]}, {:idna, "4.0.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, optional: false]}]}, @@ -33,7 +32,6 @@ "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], []}, "nex_json_schema": {:hex, :nex_json_schema, "0.6.0", "2e87c13defd7dc426549034e778a1b4e2892a283def30bf4af323963aa5bc431", [:mix], []}, "phoenix_ecto": {:hex, :phoenix_ecto, "3.2.3", "450c749876ff1de4a78fdb305a142a76817c77a1cd79aeca29e5fc9a6c630b26", [:mix], [{:ecto, "~> 2.1", [hex: :ecto, optional: false]}, {:phoenix_html, "~> 2.9", [hex: :phoenix_html, optional: true]}, {:plug, "~> 1.0", [hex: :plug, optional: false]}]}, - "phoenix_html": {:hex, :phoenix_html, "2.9.3", "1b5a2122cbf743aa242f54dced8a4f1cc778b8bd304f4b4c0043a6250c58e258", [:mix], [{:plug, "~> 1.0", [hex: :plug, optional: false]}]}, "plug": {:hex, :plug, "1.3.5", "7503bfcd7091df2a9761ef8cecea666d1f2cc454cbbaf0afa0b6e259203b7031", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1", [hex: :cowboy, optional: true]}, {:mime, "~> 1.0", [hex: :mime, optional: false]}]}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], []}, "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []}, diff --git a/test/unit/plugins/monitoring_test.exs b/test/unit/plugins/monitoring_test.exs index 34a883a68..35c9dc3ac 100644 --- a/test/unit/plugins/monitoring_test.exs +++ b/test/unit/plugins/monitoring_test.exs @@ -3,26 +3,12 @@ defmodule Annon.Plugins.MonitoringTest do use Annon.UnitCase setup do - :sys.replace_state ExStatsD, fn state -> + :sys.replace_state Annon.Monitoring.MetricsCollector, fn state -> Map.update!(state, :sink, fn _prev_state -> [] end) end end test "metrics work properly" do - make_connection() - - assert [ - "test.response_count:1|c|#http_host:www.example.com,http_method:GET,http_port:80" - <> ",api_name:Montoring Test api," <> _, - "test.latency:" <> _, - "test.request_count:1|c|#http_host:www.example.com,http_method:GET,http_port:80" - <> ",api_name:Montoring Test api" <> _, - "test.request_size:28|h|#http_host:www.example.com,http_method:GET,http_port:80" - <> ",api_name:Montoring Test api" <> _, - ] = sent() - end - - defp make_connection do api = Annon.ConfigurationFactory.insert(:api, %{ name: "Montoring Test api", request: Annon.ConfigurationFactory.build(:api_request, %{host: "www.example.com", path: "/apis"}) @@ -42,9 +28,47 @@ defmodule Annon.Plugins.MonitoringTest do "/apis" |> call_public_router() + + [ + %{ + key: "response_count", + options: [tags: ["http_host:www.example.com", "http_method:GET", + "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _, "http_status:200"]], + type: :counter, + value: "1" + }, + %{ + key: "latency", + options: [tags: ["http_host:www.example.com", "http_method:GET", + "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _, "http_status:200"]], + type: :timing, value: _ + }, + %{ + key: "request_count", + options: [tags: ["http_host:www.example.com", "http_method:GET", + "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _]], + type: :counter, + value: "1" + }, + %{ + key: "request_size", + options: [tags: ["http_host:www.example.com", "http_method:GET", + "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _]], + type: :histogram, + value: _ + } + ] = sent() end - defp sent(name \\ExStatsD), + defp sent(name \\ Annon.Monitoring.MetricsCollector), do: state(name).sink defp state(name), From b018434ce2171e4c99ea9782b0342d68e4be0c16 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 00:22:00 +0300 Subject: [PATCH 3/9] Added docs --- lib/annon_api/monitoring/metrics_collector.ex | 79 ++++++++++++++++++- lib/annon_api/plugins/monitoring.ex | 1 - 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/lib/annon_api/monitoring/metrics_collector.ex b/lib/annon_api/monitoring/metrics_collector.ex index 522566d54..75d7ab558 100644 --- a/lib/annon_api/monitoring/metrics_collector.ex +++ b/lib/annon_api/monitoring/metrics_collector.ex @@ -7,7 +7,9 @@ defmodule Annon.Monitoring.MetricsCollector do use GenServer alias AnnonMonitoring.MetricsCollector.Packet - defstruct [:sock, :header, :config] + @type key :: iodata + @type options :: [sample_rate: float, tags: [String.t]] + @type on_send :: :ok | {:error, term} @doc """ Starts a metric collector process. @@ -41,26 +43,101 @@ defmodule Annon.Monitoring.MetricsCollector do }} end + @doc """ + Increments the StatsD counter identified by `key` by the given `value`. + + `value` is supposed to be zero or positive and `c:decrement/3` should be + used for negative values. + + ## Examples + + iex> increment("hits", 1, []) + :ok + + """ + @spec increment(key, value :: number, options) :: on_send def increment(key, val \\ 1, options \\ []) when is_number(val) do transmit(:counter, key, val, options) end + @doc """ + Decrements the StatsD counter identified by `key` by the given `value`. + + Works same as `c:increment/3` but subtracts `value` instead of adding it. For + this reason `value` should be zero or negative. + + ## Examples + + iex> decrement("open_connections", 1, []) + :ok + + """ + @spec decrement(key, value :: number, options) :: on_send def decrement(key, val \\ 1, options \\ []) when is_number(val) do transmit(:counter, key, [?-, to_string(val)], options) end + @doc """ + Writes to the StatsD gauge identified by `key`. + + ## Examples + + iex> gauge("cpu_usage", 0.83, []) + :ok + + """ + @spec gauge(key, value :: String.Chars.t, options) :: on_send def gauge(key, val, options \\ [] ) do transmit(:gauge, key, val, options) end + @doc """ + Writes `value` to the histogram identified by `key`. + + Not all StatsD-compatible servers support histograms. An example of a such + server [statsite](https://github.com/statsite/statsite). + + ## Examples + + iex> histogram("online_users", 123, []) + :ok + + """ + @spec histogram(key, value :: String.Chars.t, options) :: on_send def histogram(key, val, options \\ []) do transmit(:histogram, key, val, options) end + @doc """ + Writes the given `value` to the StatsD timing identified by `key`. + + `value` is expected in milliseconds. + + ## Examples + + iex> timing("rendering", 12, []) + :ok + + """ + @spec timing(key, value :: String.Chars.t, options) :: on_send def timing(key, val, options \\ []) do transmit(:timing, key, val, options) end + @doc """ + Writes the given `value` to the StatsD set identified by `key`. + + ## Examples + + iex> set("unique_visitors", "user1", []) + :ok + + """ + @spec set(key, value :: String.Chars.t, options) :: on_send + def set(key, val, options \\ []) do + transmit(:set, key, val, options) + end + @doc false def transmit(type, key, val, options) when (is_binary(key) or is_list(key)) and is_list(options) do sample_rate = Keyword.get(options, :sample_rate) diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index 4679181e5..f7eec8541 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -19,7 +19,6 @@ defmodule Annon.Plugins.Monitoring do def execute(%Conn{} = conn, %{api: api, start_time: request_start_time}, _settings) do api_tags = tags(conn, api) - MetricsCollector.histogram("request_size", get_request_size(conn), tags: api_tags) MetricsCollector.increment("request_count", 1, tags: api_tags) From 11dba5b9b39ac0f116e27d3bfa44e16779c181db Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 16:01:31 +0300 Subject: [PATCH 4/9] Move metric collection out to a separate package and refactor Monitoring plugin --- config/test.exs | 3 +- lib/annon_api/application.ex | 2 +- lib/annon_api/monitoring/latencies.ex | 8 + lib/annon_api/monitoring/metrics_collector.ex | 179 ++---------------- .../monitoring/metrics_collector/packet.ex | 50 ----- lib/annon_api/plugins/monitoring.ex | 106 ++++++----- mix.exs | 3 +- mix.lock | 1 + test/unit/plugins/monitoring_test.exs | 68 +++---- 9 files changed, 114 insertions(+), 306 deletions(-) create mode 100644 lib/annon_api/monitoring/latencies.ex delete mode 100644 lib/annon_api/monitoring/metrics_collector/packet.ex diff --git a/config/test.exs b/config/test.exs index 45d78423a..34dae07fb 100644 --- a/config/test.exs +++ b/config/test.exs @@ -10,7 +10,8 @@ config :annon_api, Annon.Requests.Repo, config :annon_api, :metrics_collector, sink: [], - namespace: "test" + namespace: "test", + sample_rate: 1 config :annon_api, :acceptance, management: [ diff --git a/lib/annon_api/application.ex b/lib/annon_api/application.ex index df97cbe25..0266f1eab 100644 --- a/lib/annon_api/application.ex +++ b/lib/annon_api/application.ex @@ -15,7 +15,7 @@ defmodule Annon do children = [ supervisor(Annon.Configuration.Repo, []), supervisor(Annon.Requests.Repo, []), - worker(Annon.Monitoring.MetricsCollector, [metrics_collector_opts()]), + worker(DogStat, [metrics_collector_opts()]), worker(Annon.Configuration.Matcher, [matcher_opts()]), worker(Annon.AutoClustering, []), management_endpoint_spec(), diff --git a/lib/annon_api/monitoring/latencies.ex b/lib/annon_api/monitoring/latencies.ex new file mode 100644 index 000000000..c79330072 --- /dev/null +++ b/lib/annon_api/monitoring/latencies.ex @@ -0,0 +1,8 @@ +defmodule Annon.Monitoring.Latencies do + @moduledoc false + + defstruct request_id: nil, + client: nil, + upstream: nil, + gateway: nil +end diff --git a/lib/annon_api/monitoring/metrics_collector.ex b/lib/annon_api/monitoring/metrics_collector.ex index 75d7ab558..ca3911c7e 100644 --- a/lib/annon_api/monitoring/metrics_collector.ex +++ b/lib/annon_api/monitoring/metrics_collector.ex @@ -4,180 +4,29 @@ defmodule Annon.Monitoring.MetricsCollector do Code is based on [Statix](https://github.com/lexmag/statix) library. """ - use GenServer - alias AnnonMonitoring.MetricsCollector.Packet + import DogStat + alias Annon.Monitoring.Latencies - @type key :: iodata - @type options :: [sample_rate: float, tags: [String.t]] - @type on_send :: :ok | {:error, term} - - @doc """ - Starts a metric collector process. - - `conn_opts` accepts connection arg - """ - def start_link(conn_opts) do - GenServer.start_link(__MODULE__, conn_opts, name: __MODULE__) + def track_request(_request_id, nil, opts), + do: increment("request_count", 1, opts) + def track_request(_request_id, content_length, opts) do + increment("request_count", 1, opts) + histogram("request_size", content_length, opts) end - @doc false - def init(conn_opts) do - enabled? = Keyword.get(conn_opts, :enabled?, true) - host = conn_opts |> Keyword.get(:host, "127.0.0.1") |> String.to_char_list() - port = Keyword.get(conn_opts, :port, 8125) - sink = Keyword.get(conn_opts, :sink, nil) - namespace = Keyword.get(conn_opts, :namespace, nil) - send_tags? = Keyword.get(conn_opts, :send_tags?, true) - - {:ok, address} = :inet.getaddr(host, :inet) - header = Packet.header(address, port) + def track_response(_request_id, latencies, opts) do + %Latencies{client: client, upstream: upstream, gateway: gateway} = latencies - {:ok, socket} = :gen_udp.open(0, [active: false]) - - {:ok, %{ - enabled?: enabled?, - send_tags?: send_tags?, - header: [header | "#{namespace}."], - socket: socket, - sink: sink - }} + histogram("latencies_client", client, opts) + histogram("latencies_upstream", upstream, opts) + histogram("latencies_gateway", gateway, opts) end - @doc """ - Increments the StatsD counter identified by `key` by the given `value`. - - `value` is supposed to be zero or positive and `c:decrement/3` should be - used for negative values. - - ## Examples + def track_repo_activity() do - iex> increment("hits", 1, []) - :ok - - """ - @spec increment(key, value :: number, options) :: on_send - def increment(key, val \\ 1, options \\ []) when is_number(val) do - transmit(:counter, key, val, options) end - @doc """ - Decrements the StatsD counter identified by `key` by the given `value`. - - Works same as `c:increment/3` but subtracts `value` instead of adding it. For - this reason `value` should be zero or negative. - - ## Examples - - iex> decrement("open_connections", 1, []) - :ok - - """ - @spec decrement(key, value :: number, options) :: on_send - def decrement(key, val \\ 1, options \\ []) when is_number(val) do - transmit(:counter, key, [?-, to_string(val)], options) - end - - @doc """ - Writes to the StatsD gauge identified by `key`. - - ## Examples - - iex> gauge("cpu_usage", 0.83, []) - :ok - - """ - @spec gauge(key, value :: String.Chars.t, options) :: on_send - def gauge(key, val, options \\ [] ) do - transmit(:gauge, key, val, options) - end - - @doc """ - Writes `value` to the histogram identified by `key`. - - Not all StatsD-compatible servers support histograms. An example of a such - server [statsite](https://github.com/statsite/statsite). - - ## Examples - - iex> histogram("online_users", 123, []) - :ok - - """ - @spec histogram(key, value :: String.Chars.t, options) :: on_send - def histogram(key, val, options \\ []) do - transmit(:histogram, key, val, options) - end - - @doc """ - Writes the given `value` to the StatsD timing identified by `key`. - - `value` is expected in milliseconds. - - ## Examples - - iex> timing("rendering", 12, []) - :ok - - """ - @spec timing(key, value :: String.Chars.t, options) :: on_send - def timing(key, val, options \\ []) do - transmit(:timing, key, val, options) - end - - @doc """ - Writes the given `value` to the StatsD set identified by `key`. - - ## Examples - - iex> set("unique_visitors", "user1", []) - :ok - - """ - @spec set(key, value :: String.Chars.t, options) :: on_send - def set(key, val, options \\ []) do - transmit(:set, key, val, options) - end - - @doc false - def transmit(type, key, val, options) when (is_binary(key) or is_list(key)) and is_list(options) do - sample_rate = Keyword.get(options, :sample_rate) - - if is_nil(sample_rate) or sample_rate >= :rand.uniform() do - GenServer.cast(__MODULE__, {:transmit, type, key, to_string(val), options}) - end - - :ok - end - - @doc false - def handle_cast({:transmit, _type, _key, _value, _options}, %{enabled?: false} = state), - do: {:noreply, state} - - # Transmits message to a sink - @doc false - def handle_cast({:transmit, type, key, value, options}, %{sink: sink} = state) when is_list(sink) do - %{header: header} = state - packet = %{type: type, key: key, value: value, options: options, header: header} - {:noreply, %{state | sink: [packet | sink]}} - end - - # Transmits message to a StatsD server - @doc false - def handle_cast({:transmit, type, key, value, options}, state) do - %{header: header, socket: socket, send_tags?: send_tags?} = state - - packet = Packet.build(header, type, key, value, send_tags?, options) - Port.command(socket, packet) - - receive do - {:inet_reply, _port, status} -> status - end - - {:noreply, state} - end + def track_latency(name, value, tags) do - @doc false - def handle_call(:flush, _from, state) do - {:reply, :ok, state} end end diff --git a/lib/annon_api/monitoring/metrics_collector/packet.ex b/lib/annon_api/monitoring/metrics_collector/packet.ex deleted file mode 100644 index eeec267c3..000000000 --- a/lib/annon_api/monitoring/metrics_collector/packet.ex +++ /dev/null @@ -1,50 +0,0 @@ -defmodule AnnonMonitoring.MetricsCollector.Packet do - @moduledoc false - use Bitwise - - otp_release = :erlang.system_info(:otp_release) - @addr_family if(otp_release >= '19', do: [1], else: []) - - def header({n1, n2, n3, n4}, port) do - @addr_family ++ [ - band(bsr(port, 8), 0xFF), - band(port, 0xFF), - band(n1, 0xFF), - band(n2, 0xFF), - band(n3, 0xFF), - band(n4, 0xFF) - ] - end - - def build(header, name, key, val, send_tags?, options) do - [header, key, ?:, val, ?|, metric_type(name)] - |> set_option(:sample_rate, options[:sample_rate]) - |> set_option(:tags, options[:tags], send_tags?) - end - - metrics = %{ - counter: "c", - gauge: "g", - histogram: "h", - timing: "ms", - set: "s" - } - - for {name, type} <- metrics do - defp metric_type(unquote(name)), do: unquote(type) - end - - defp set_option(packet, _kind, _sample_rate, _send_tags? \\ nil) - - defp set_option(packet, _kind, nil, _send_tags?), - do: packet - - defp set_option(packet, :sample_rate, sample_rate, _send_tags?) when is_float(sample_rate), - do: [packet | ["|@", :erlang.float_to_binary(sample_rate, [:compact, decimals: 2])]] - - defp set_option(packet, :tags, tags, false) when is_list(tags), - do: packet - - defp set_option(packet, :tags, tags, true) when is_list(tags), - do: [packet | ["|#", Enum.join(tags, ",")]] -end diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index f7eec8541..fc44e300b 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -9,6 +9,7 @@ defmodule Annon.Plugins.Monitoring do use Annon.Plugin, plugin_name: :monitoring alias Plug.Conn alias Annon.Monitoring.MetricsCollector + alias Annon.Monitoring.Latencies def validate_settings(changeset), do: changeset @@ -17,72 +18,83 @@ defmodule Annon.Plugins.Monitoring do do: %{} def execute(%Conn{} = conn, %{api: api, start_time: request_start_time}, _settings) do - api_tags = tags(conn, api) + content_length = get_content_length(conn, nil) - MetricsCollector.histogram("request_size", get_request_size(conn), tags: api_tags) - MetricsCollector.increment("request_count", 1, tags: api_tags) + sample_rate = + :annon_api + |> Application.get_env(:metrics_collector) + |> Keyword.get(:sample_rate, 1) + |> Confex.process_env() + + collector_opts = [ + tags: tags(conn, api), + sample_rate: sample_rate + ] + + request_id = get_request_id(conn, nil) + + MetricsCollector.track_request(request_id, content_length, collector_opts) conn - |> Conn.register_before_send(&write_metrics(&1, api)) - |> Conn.register_before_send(&assign_latencies(&1, request_start_time)) + |> Conn.register_before_send(&track_latencies(&1, request_id, request_start_time, collector_opts)) end - defp assign_latencies(conn, request_start_time) do + defp track_latencies(conn, request_id, request_start_time, collector_opts) do request_end_time = System.monotonic_time() latencies_client = System.convert_time_unit(request_end_time - request_start_time, :native, :micro_seconds) - request_duration = latencies_client - Map.get(conn.assigns, :latencies_upstream, 0) + latencies_upstream = Map.get(conn.assigns, :latencies_upstream, 0) + latencies_gateway = latencies_client - latencies_upstream - conn - |> Conn.assign(:latencies_gateway, request_duration) - |> Conn.assign(:latencies_client, latencies_client) - end + latencies = %Latencies{ + client: latencies_client, + upstream: latencies_upstream, + gateway: latencies_gateway + } - defp write_metrics(%Conn{} = conn, api) do - api_tags = tags(conn, api) ++ ["http_status:#{to_string conn.status}"] - MetricsCollector.timing("latency", conn.assigns.latencies_client, tags: api_tags) - MetricsCollector.increment("response_count", 1, tags: api_tags) - conn - end + status = conn |> get_conn_status(0) |> Integer.to_string() - defp tags(%Conn{host: host, method: method, port: port} = conn, api), - do: ["http_host:#{to_string host}", - "http_method:#{to_string method}", - "http_port:#{to_string port}"] ++ api_tags(api) ++ get_request_id(conn) + MetricsCollector.track_response(request_id, latencies, [ + tags: ["http_status:#{status}"] ++ collector_opts[:tags], + sample_rate: collector_opts[:sample_rate] + ]) - defp api_tags(%{name: api_name, id: api_id}), - do: ["api_name:#{to_string api_name}", "api_id:#{to_string api_id}"] - defp api_tags(_), - do: ["api_name:unknown", "api_id:unknown"] + conn + |> Conn.assign(:latencies_gateway, latencies_gateway) + |> Conn.assign(:latencies_client, latencies_client) + end - defp get_request_id(conn) do - id = conn - |> Conn.get_resp_header("x-request-id") - |> Enum.at(0) + defp tags(%Conn{host: host, method: method, port: port} = conn, nil) do + port = Integer.to_string(port) + request_id = get_request_id(conn, "unknown") - ["request_id:#{to_string id}"] + ["http_host:#{host}", "http_method:#{method}", "http_port:#{port}", + "api_name:unknown", "api_id:unknown", "request_id:#{request_id}"] end + defp tags(%Conn{host: host, method: method, port: port} = conn, api) do + port = Integer.to_string(port) + request_id = get_request_id(conn, "unknown") + %{id: api_id, name: api_name} = api - defp get_request_size(conn) do - get_headers_size(conn) + get_body_size(conn) + get_query_string_size(conn) + ["http_host:#{host}", "http_method:#{method}", "http_port:#{port}", + "api_name:#{api_name}", "api_id:#{api_id}", "request_id:#{request_id}"] end - defp get_headers_size(%Conn{req_headers: req_headers}) do - req_headers - |> Enum.map(&Tuple.to_list(&1)) - |> List.flatten - |> Enum.join - |> byte_size + defp get_request_id(conn, default) do + case Conn.get_resp_header(conn, "x-request-id") do + [] -> default + [id | _] -> id + end end - defp get_body_size(conn) do - conn - |> Conn.read_body - |> elem(1) - |> byte_size + defp get_content_length(conn, default) do + case Conn.get_resp_header(conn, "content-length") do + [] -> default + [id | _] -> id + end end - defp get_query_string_size(%Conn{query_string: query_string}) do - query_string - |> byte_size - end + def get_conn_status(%{status: nil}, default), + do: default + def get_conn_status(%{status: status}, _default), + do: status end diff --git a/mix.exs b/mix.exs index 48a7973b1..a9ba5fea5 100644 --- a/mix.exs +++ b/mix.exs @@ -26,7 +26,7 @@ defmodule Annon.Mixfile do def application do [ extra_applications: [:logger, :logger_json, :confex, :cowboy, :plug, :postgrex, :ecto, :joken, - :nex_json_schema, :poison, :httpoison, :skycluster, :eview, + :nex_json_schema, :poison, :httpoison, :skycluster, :eview, :dogstat, :ecto_paging, :runtime_tools], mod: {Annon, []} ] @@ -66,6 +66,7 @@ defmodule Annon.Mixfile do {:phoenix_ecto, ">= 0.0.0"}, {:logger_json, "~> 0.4.0"}, {:cors_plug, "~> 1.1"}, + {:dogstat, "~> 0.1.0"}, {:ex_machina, ">= 1.0.0", only: [:dev, :test]}, {:dogma, ">= 0.0.0", only: [:dev, :test]}, {:benchfella, "~> 0.3", only: [:dev, :test]}, diff --git a/mix.lock b/mix.lock index 4f7073aa0..0ed63c759 100644 --- a/mix.lock +++ b/mix.lock @@ -12,6 +12,7 @@ "decimal": {:hex, :decimal, "1.3.1", "157b3cedb2bfcb5359372a7766dd7a41091ad34578296e951f58a946fcab49c6", [:mix], []}, "distillery": {:hex, :distillery, "1.3.5", "d13d3a5282c8e7b2128383e9b4491dac39dc9ab37b43fc000f10e687057cfacd", [:mix], []}, "dogma": {:hex, :dogma, "0.1.15", "5bceba9054b2b97a4adcb2ab4948ca9245e5258b883946e82d32f785340fd411", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, optional: false]}]}, + "dogstat": {:hex, :dogstat, "0.1.0", "9c59f5673fde8cacaa6e57e79853ac9a2d9c4938eb2d8d136423782ca096ae3e", [:mix], []}, "earmark": {:hex, :earmark, "1.2.1", "7ad3f203ab84d31832814483c834e006cf88949f061a4b50d7e783147572280f", [:mix], []}, "ecto": {:hex, :ecto, "2.1.4", "d1ba932813ec0e0d9db481ef2c17777f1cefb11fc90fa7c142ff354972dfba7e", [:mix], [{:db_connection, "~> 1.1", [hex: :db_connection, optional: true]}, {:decimal, "~> 1.2", [hex: :decimal, optional: false]}, {:mariaex, "~> 0.8.0", [hex: :mariaex, optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, optional: true]}, {:poolboy, "~> 1.5", [hex: :poolboy, optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, optional: true]}, {:sbroker, "~> 1.0", [hex: :sbroker, optional: true]}]}, "ecto_paging": {:hex, :ecto_paging, "0.6.2", "7d620ee7f6bd713439cfb706b9163149f59fb1898680a6fa4c30c970ef8286b8", [:mix], [{:ecto, ">= 2.1.0", [hex: :ecto, optional: false]}, {:postgrex, "~> 0.13.0", [hex: :postgrex, optional: true]}]}, diff --git a/test/unit/plugins/monitoring_test.exs b/test/unit/plugins/monitoring_test.exs index 35c9dc3ac..825b13871 100644 --- a/test/unit/plugins/monitoring_test.exs +++ b/test/unit/plugins/monitoring_test.exs @@ -3,7 +3,7 @@ defmodule Annon.Plugins.MonitoringTest do use Annon.UnitCase setup do - :sys.replace_state Annon.Monitoring.MetricsCollector, fn state -> + :sys.replace_state DogStat, fn state -> Map.update!(state, :sink, fn _prev_state -> [] end) end end @@ -26,49 +26,35 @@ defmodule Annon.Plugins.MonitoringTest do } }) - "/apis" - |> call_public_router() + call_public_router("/apis") - [ - %{ - key: "response_count", - options: [tags: ["http_host:www.example.com", "http_method:GET", - "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _, "http_status:200"]], - type: :counter, - value: "1" - }, - %{ - key: "latency", - options: [tags: ["http_host:www.example.com", "http_method:GET", - "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _, "http_status:200"]], - type: :timing, value: _ - }, - %{ - key: "request_count", - options: [tags: ["http_host:www.example.com", "http_method:GET", - "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _]], - type: :counter, - value: "1" - }, - %{ - key: "request_size", - options: [tags: ["http_host:www.example.com", "http_method:GET", - "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _]], - type: :histogram, - value: _ - } - ] = sent() + [%{header: [_, "test", 46], key: "latencies_gateway", + options: [tags: ["http_status:200", "http_host:www.example.com", + "http_method:GET", "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _], sample_rate: 1], + type: :histogram, value: _}, + %{header: [_, "test", 46], key: "latencies_upstream", + options: [tags: ["http_status:200", "http_host:www.example.com", + "http_method:GET", "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _], sample_rate: 1], + type: :histogram, value: _}, + %{header: [_, "test", 46], key: "latencies_client", + options: [tags: ["http_status:200", "http_host:www.example.com", + "http_method:GET", "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _], sample_rate: 1], + type: :histogram, value: _}, + %{header: [_, "test", 46], key: "request_count", + options: [tags: ["http_host:www.example.com", "http_method:GET", + "http_port:80", "api_name:Montoring Test api", + "api_id:" <> _, + "request_id:" <> _], sample_rate: 1], + type: :counter, value: "1"}] = sent() end - defp sent(name \\ Annon.Monitoring.MetricsCollector), + defp sent(name \\ DogStat), do: state(name).sink defp state(name), From 283542d5b18ba93a7f5d91e643d111c8dbda734e Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 16:02:11 +0300 Subject: [PATCH 5/9] Assign latencies struct --- lib/annon_api/monitoring/latencies.ex | 3 +-- lib/annon_api/plugins/monitoring.ex | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/annon_api/monitoring/latencies.ex b/lib/annon_api/monitoring/latencies.ex index c79330072..12306ca6c 100644 --- a/lib/annon_api/monitoring/latencies.ex +++ b/lib/annon_api/monitoring/latencies.ex @@ -1,8 +1,7 @@ defmodule Annon.Monitoring.Latencies do @moduledoc false - defstruct request_id: nil, - client: nil, + defstruct client: nil, upstream: nil, gateway: nil end diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index fc44e300b..f8dc81f52 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -61,6 +61,7 @@ defmodule Annon.Plugins.Monitoring do conn |> Conn.assign(:latencies_gateway, latencies_gateway) |> Conn.assign(:latencies_client, latencies_client) + |> Conn.assign(:latencies, latencies) end defp tags(%Conn{host: host, method: method, port: port} = conn, nil) do From 461204ddc30b7779c12cf5761b54e3b81d44fe45 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 16:04:33 +0300 Subject: [PATCH 6/9] Reuse latencies struct --- lib/annon_api/monitoring/latencies.ex | 2 +- lib/annon_api/monitoring/metrics_collector.ex | 2 +- lib/annon_api/plugins/logger.ex | 7 ++----- lib/annon_api/plugins/monitoring.ex | 2 +- 4 files changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/annon_api/monitoring/latencies.ex b/lib/annon_api/monitoring/latencies.ex index 12306ca6c..a73dc5497 100644 --- a/lib/annon_api/monitoring/latencies.ex +++ b/lib/annon_api/monitoring/latencies.ex @@ -1,7 +1,7 @@ defmodule Annon.Monitoring.Latencies do @moduledoc false - defstruct client: nil, + defstruct client_request: nil, upstream: nil, gateway: nil end diff --git a/lib/annon_api/monitoring/metrics_collector.ex b/lib/annon_api/monitoring/metrics_collector.ex index ca3911c7e..7f9f82d19 100644 --- a/lib/annon_api/monitoring/metrics_collector.ex +++ b/lib/annon_api/monitoring/metrics_collector.ex @@ -15,7 +15,7 @@ defmodule Annon.Monitoring.MetricsCollector do end def track_response(_request_id, latencies, opts) do - %Latencies{client: client, upstream: upstream, gateway: gateway} = latencies + %Latencies{client_request: client, upstream: upstream, gateway: gateway} = latencies histogram("latencies_client", client, opts) histogram("latencies_upstream", upstream, opts) diff --git a/lib/annon_api/plugins/logger.ex b/lib/annon_api/plugins/logger.ex index 274245ca6..c06632be8 100644 --- a/lib/annon_api/plugins/logger.ex +++ b/lib/annon_api/plugins/logger.ex @@ -97,11 +97,8 @@ defmodule Annon.Plugins.Logger do end defp get_latencies_data(conn) do - %{ - gateway: Map.get(conn.assigns, :latencies_gateway), - upstream: Map.get(conn.assigns, :latencies_upstream), - client_request: Map.get(conn.assigns, :latencies_client) - } + conn.assigns + |> Map.get(:latencies) |> prepare_params end diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index f8dc81f52..1a5b926ec 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -46,7 +46,7 @@ defmodule Annon.Plugins.Monitoring do latencies_gateway = latencies_client - latencies_upstream latencies = %Latencies{ - client: latencies_client, + client_request: latencies_client, upstream: latencies_upstream, gateway: latencies_gateway } From b99e5fa66b267fd2e8fb1506ceca9e735c0c24c7 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 16:34:24 +0300 Subject: [PATCH 7/9] Use OpenTracing style for tag names --- lib/annon_api/plugins/monitoring.ex | 10 ++++----- test/unit/plugins/monitoring_test.exs | 32 +++++++++++++-------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index 1a5b926ec..901a9fa53 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -54,7 +54,7 @@ defmodule Annon.Plugins.Monitoring do status = conn |> get_conn_status(0) |> Integer.to_string() MetricsCollector.track_response(request_id, latencies, [ - tags: ["http_status:#{status}"] ++ collector_opts[:tags], + tags: ["http.status:#{status}"] ++ collector_opts[:tags], sample_rate: collector_opts[:sample_rate] ]) @@ -68,16 +68,16 @@ defmodule Annon.Plugins.Monitoring do port = Integer.to_string(port) request_id = get_request_id(conn, "unknown") - ["http_host:#{host}", "http_method:#{method}", "http_port:#{port}", - "api_name:unknown", "api_id:unknown", "request_id:#{request_id}"] + ["http.host:#{host}", "http.method:#{method}", "http.port:#{port}", + "api.name:unknown", "api.id:unknown", "request.id:#{request_id}"] end defp tags(%Conn{host: host, method: method, port: port} = conn, api) do port = Integer.to_string(port) request_id = get_request_id(conn, "unknown") %{id: api_id, name: api_name} = api - ["http_host:#{host}", "http_method:#{method}", "http_port:#{port}", - "api_name:#{api_name}", "api_id:#{api_id}", "request_id:#{request_id}"] + ["http.host:#{host}", "http.method:#{method}", "http.port:#{port}", + "api.name:#{api_name}", "api.id:#{api_id}", "request.id:#{request_id}"] end defp get_request_id(conn, default) do diff --git a/test/unit/plugins/monitoring_test.exs b/test/unit/plugins/monitoring_test.exs index 825b13871..b3fba42b1 100644 --- a/test/unit/plugins/monitoring_test.exs +++ b/test/unit/plugins/monitoring_test.exs @@ -29,28 +29,28 @@ defmodule Annon.Plugins.MonitoringTest do call_public_router("/apis") [%{header: [_, "test", 46], key: "latencies_gateway", - options: [tags: ["http_status:200", "http_host:www.example.com", - "http_method:GET", "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _], sample_rate: 1], + options: [tags: ["http.status:200", "http.host:www.example.com", + "http.method:GET", "http.port:80", "api.name:Montoring Test api", + "api.id:" <> _, + "request.id:" <> _], sample_rate: 1], type: :histogram, value: _}, %{header: [_, "test", 46], key: "latencies_upstream", - options: [tags: ["http_status:200", "http_host:www.example.com", - "http_method:GET", "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _], sample_rate: 1], + options: [tags: ["http.status:200", "http.host:www.example.com", + "http.method:GET", "http.port:80", "api.name:Montoring Test api", + "api.id:" <> _, + "request.id:" <> _], sample_rate: 1], type: :histogram, value: _}, %{header: [_, "test", 46], key: "latencies_client", - options: [tags: ["http_status:200", "http_host:www.example.com", - "http_method:GET", "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _], sample_rate: 1], + options: [tags: ["http.status:200", "http.host:www.example.com", + "http.method:GET", "http.port:80", "api.name:Montoring Test api", + "api.id:" <> _, + "request.id:" <> _], sample_rate: 1], type: :histogram, value: _}, %{header: [_, "test", 46], key: "request_count", - options: [tags: ["http_host:www.example.com", "http_method:GET", - "http_port:80", "api_name:Montoring Test api", - "api_id:" <> _, - "request_id:" <> _], sample_rate: 1], + options: [tags: ["http.host:www.example.com", "http.method:GET", + "http.port:80", "api.name:Montoring Test api", + "api.id:" <> _, + "request.id:" <> _], sample_rate: 1], type: :counter, value: "1"}] = sent() end From d9efa110a802785acce737164b9ebfe1177c20e7 Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Thu, 11 May 2017 16:35:11 +0300 Subject: [PATCH 8/9] Drop unused code --- lib/annon_api/monitoring/metrics_collector.ex | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/annon_api/monitoring/metrics_collector.ex b/lib/annon_api/monitoring/metrics_collector.ex index 7f9f82d19..bd2962aca 100644 --- a/lib/annon_api/monitoring/metrics_collector.ex +++ b/lib/annon_api/monitoring/metrics_collector.ex @@ -21,12 +21,4 @@ defmodule Annon.Monitoring.MetricsCollector do histogram("latencies_upstream", upstream, opts) histogram("latencies_gateway", gateway, opts) end - - def track_repo_activity() do - - end - - def track_latency(name, value, tags) do - - end end From 08dadbdfecf27db1d6da619c87dca413ab91780b Mon Sep 17 00:00:00 2001 From: Andrew Dryga Date: Fri, 12 May 2017 01:01:27 +0300 Subject: [PATCH 9/9] Send spans POC --- lib/annon_api/monitoring/trace.ex | 63 +++++++++++++++++++ lib/annon_api/monitoring/trace/annotation.ex | 5 ++ .../monitoring/trace/binary_annotation.ex | 5 ++ lib/annon_api/monitoring/trace/endpoint.ex | 6 ++ lib/annon_api/monitoring/trace_collector.ex | 17 +++++ lib/annon_api/plugins/monitoring.ex | 2 + 6 files changed, 98 insertions(+) create mode 100644 lib/annon_api/monitoring/trace.ex create mode 100644 lib/annon_api/monitoring/trace/annotation.ex create mode 100644 lib/annon_api/monitoring/trace/binary_annotation.ex create mode 100644 lib/annon_api/monitoring/trace/endpoint.ex create mode 100644 lib/annon_api/monitoring/trace_collector.ex diff --git a/lib/annon_api/monitoring/trace.ex b/lib/annon_api/monitoring/trace.ex new file mode 100644 index 000000000..b1d7947ae --- /dev/null +++ b/lib/annon_api/monitoring/trace.ex @@ -0,0 +1,63 @@ +defmodule Annon.Monitoring.Trace do + alias Annon.Monitoring.Trace + alias Annon.Monitoring.Trace.BinaryAnnotation + alias Annon.Monitoring.Trace.Endpoint + alias Plug.Conn + + defstruct traceId: nil, # Randomly generated, unique for a trace, set on all spans within it. 16-32 chars + name: nil, # Span name in lowercase (e.g. rpc method) + parentId: nil, # Parent span id. 8-byte identifier encoded as 16 lowercase hex characters. + # Can be omitted or set to nil if span is the root span of a trace. + id: nil, # Id of current span, unique in context of traceId. + # 8-byte identifier encoded as 16 lowercase hex characters. + timestamp: nil, # Epoch **microseconds** of the start of this span, + # possibly absent if this an incomplete span. + duration: nil, # Duration in **microseconds** of the critical path, if known. + # Durations of less than one are rounded up. + debug: false, + annotations: [], + binaryAnnotations: [] + + def start_span(%Conn{} = conn, opts \\ []) do + request_id = get_request_id(conn, Ecto.UUID.generate()) + timestamp = System.monotonic_time() |> System.convert_time_unit(:native, :microseconds) + endpoint = nil + + annotations = + opts + |> Keyword.get(:annotations, []) + |> Enum.map(fn {key, value} -> %BinaryAnnotation{key: key, value: value, endpoint: endpoint} end) + + %Trace{ + traceId: request_id, + name: "gateway request", + id: Ecto.UUID.generate(), + timestamp: timestamp, + binaryAnnotations: annotations + } + end + + def end_span(%Trace{} = trace, opts \\ []) do + duration = System.convert_time_unit(System.monotonic_time(), :native, :microseconds) - trace.timestamp + endpoint = nil + + annotations = + opts + |> Keyword.get(:annotations, []) + |> Enum.reduce(trace.annotations, fn {key, value}, annotations -> + [%BinaryAnnotation{key: key, value: value, endpoint: endpoint}] ++ annotations + end) + + %{trace | + duration: duration, + annotations: annotations + } + end + + defp get_request_id(conn, default) do + case Conn.get_resp_header(conn, "x-request-id") do + [] -> default + [id | _] -> id + end + end +end diff --git a/lib/annon_api/monitoring/trace/annotation.ex b/lib/annon_api/monitoring/trace/annotation.ex new file mode 100644 index 000000000..ba0fbeae5 --- /dev/null +++ b/lib/annon_api/monitoring/trace/annotation.ex @@ -0,0 +1,5 @@ +defmodule Annon.Monitoring.Trace.Annotation do + defstruct timestamp: nil, + value: nil, + endpoint: %Annon.Monitoring.Trace.Endpoint{} +end diff --git a/lib/annon_api/monitoring/trace/binary_annotation.ex b/lib/annon_api/monitoring/trace/binary_annotation.ex new file mode 100644 index 000000000..9b3307860 --- /dev/null +++ b/lib/annon_api/monitoring/trace/binary_annotation.ex @@ -0,0 +1,5 @@ +defmodule Annon.Monitoring.Trace.BinaryAnnotation do + defstruct key: nil, + value: nil, + endpoint: %Annon.Monitoring.Trace.Endpoint{} +end diff --git a/lib/annon_api/monitoring/trace/endpoint.ex b/lib/annon_api/monitoring/trace/endpoint.ex new file mode 100644 index 000000000..564e6db53 --- /dev/null +++ b/lib/annon_api/monitoring/trace/endpoint.ex @@ -0,0 +1,6 @@ +defmodule Annon.Monitoring.Trace.Endpoint do + defstruct serviceName: nil, # Classifier of this endpoint in lowercase, such as "acme-front-end" + ipv4: nil, # The text representation of a IPv4 address associated with this endpoint. Ex. 192.168.99.100 + ipv6: nil, # The text representation of a IPv6 address associated with this endpoint. Ex. 2001:db8::c001 + port: nil +end diff --git a/lib/annon_api/monitoring/trace_collector.ex b/lib/annon_api/monitoring/trace_collector.ex new file mode 100644 index 000000000..f87b2dce6 --- /dev/null +++ b/lib/annon_api/monitoring/trace_collector.ex @@ -0,0 +1,17 @@ +defmodule Annon.Monitoring.TraceCollector do + alias Annon.Monitoring.Trace + + def send_span(conn) do + span = + conn + |> Trace.start_span() + |> Trace.end_span() + + spans = Poison.encode!([span]) + + IO.inspect HTTPoison.post!("http://localhost:9411/api/v1/spans", spans, [ + {"content-type", "application/json"}, + {"accept", "application/json"}, + ]) + end +end diff --git a/lib/annon_api/plugins/monitoring.ex b/lib/annon_api/plugins/monitoring.ex index 901a9fa53..139d46686 100644 --- a/lib/annon_api/plugins/monitoring.ex +++ b/lib/annon_api/plugins/monitoring.ex @@ -58,6 +58,8 @@ defmodule Annon.Plugins.Monitoring do sample_rate: collector_opts[:sample_rate] ]) + Annon.Monitoring.TraceCollector.send_span(conn) + conn |> Conn.assign(:latencies_gateway, latencies_gateway) |> Conn.assign(:latencies_client, latencies_client)