diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b2770d9..d240c88 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -7,7 +7,7 @@ jobs: strategy: matrix: os: ['ubuntu-18.04', 'ubuntu-20.04'] - ruby: ['2.6', '2.7', '3.0'] + ruby: ['2.6', '2.7', '3.0', '3.1'] runs-on: ${{ matrix.os }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 8971ebc..5a91e4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [1.1.0 Beta] + +### Added + +Different ways to parse `Socket::Option`. Mainly due to the fact that `#inspect` can't +generate proper data on AWS Fargate, which runs Amazon Linux 2 with 4.14 kernel. So now +besides `#inspect` there's also `#unpack` that parses binary data and picks proper field. + +It depends on the kernel, but new fields are usually added at the end of the `tcp_info` +struct, so it should more or less stay stable. + +You can configure it by passing in `config.socket_parser = :inspect` or +`config.socket_parser = ->(opt) { your implementation }`. + ## [1.1.0 Alpha] ### Added diff --git a/Gemfile.lock b/Gemfile.lock index 0de5231..9c94278 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - puma-plugin-telemetry (1.1.0.alpha) + puma-plugin-telemetry (1.1.0.beta) puma (>= 5.0) GEM diff --git a/README.md b/README.md index a6b342c..f60347f 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,8 @@ Puma::Plugin::Telemetry.configure do |config| config.initial_delay = 10 config.frequency = 30 config.puma_telemetry = %w[workers.requests_count queue.backlog queue.capacity] + config.socket_telemetry! + config.socket_parser = :inspect config.add_target :io, formatter: :json, io: StringIO.new config.add_target :dogstatsd, client: Datadog::Statsd.new(tags: { env: ENV["RAILS_ENV"] }) end diff --git a/lib/puma/plugin/telemetry.rb b/lib/puma/plugin/telemetry.rb index e8c06d3..1023e8c 100644 --- a/lib/puma/plugin/telemetry.rb +++ b/lib/puma/plugin/telemetry.rb @@ -52,7 +52,7 @@ def socket_telemetry(telemetry, launcher) return telemetry if launcher.nil? return telemetry unless config.socket_telemetry? - telemetry.merge! SocketData.new(launcher.binder.ios) + telemetry.merge! SocketData.new(launcher.binder.ios, config.socket_parser) .metrics telemetry diff --git a/lib/puma/plugin/telemetry/config.rb b/lib/puma/plugin/telemetry/config.rb index 387f55a..dbf3d89 100644 --- a/lib/puma/plugin/telemetry/config.rb +++ b/lib/puma/plugin/telemetry/config.rb @@ -62,6 +62,20 @@ class Config # - default: false attr_accessor :socket_telemetry + # Symbol representing method to parse the `Socket::Option`, or + # the whole implementation as a lambda. Available options: + # - `:inspect`, based on the `Socket::Option#inspect` method, + # it's the safest and slowest way to extract the info. `inspect` + # output might not be available, i.e. on AWS Fargate + # - `:unpack`, parse binary data given by `Socket::Option`. Fastest + # way (12x compared to `inspect`) but depends on kernel headers + # and fields ordering within the struct. It should almost always + # match though. DEFAULT + # - proc/lambda, `Socket::Option` will be given as an argument, it + # should return the value of `unacked` field as an integer. + # + attr_accessor :socket_parser + def initialize @enabled = false @initial_delay = 5 @@ -69,6 +83,7 @@ def initialize @targets = [] @puma_telemetry = DEFAULT_PUMA_TELEMETRY @socket_telemetry = false + @socket_parser = :unpack end def enabled? diff --git a/lib/puma/plugin/telemetry/data.rb b/lib/puma/plugin/telemetry/data.rb index aa2ee8e..6f51834 100644 --- a/lib/puma/plugin/telemetry/data.rb +++ b/lib/puma/plugin/telemetry/data.rb @@ -100,13 +100,34 @@ def sum_stat(stat) class SocketData UNACKED_REGEXP = /\ unacked=(?\d+)\ /.freeze - def initialize(ios) - @sockets = ios.select { |io| io.respond_to?(:getsockopt) } + def initialize(ios, parser) + @sockets = ios.select { |io| io.respond_to?(:getsockopt) && io.is_a?(TCPSocket) } + @parser = + case parser + when :inspect then method(:parse_with_inspect) + when :unpack then method(:parse_with_unpack) + when Proc then parser + end end # Number of unacknowledged connections in the sockets, which # we know as socket backlog. # + def unacked + @sockets.sum do |socket| + @parser.call(socket.getsockopt(Socket::SOL_TCP, + Socket::TCP_INFO)) + end + end + + def metrics + { + "sockets.backlog" => unacked + } + end + + private + # The Socket::Option returned by `getsockopt` doesn't provide # any kind of accessors for data inside. It decodes it on demand # for `inspect` as strings in C implementation. It looks like @@ -143,21 +164,104 @@ def initialize(ios) # total_retrans=0 # (128 bytes too long)> # - # That's why we have to pull the `unacked` field by parsing - # `inspect` output, instead of using something like `opt.unacked` - def unacked - @sockets.sum do |socket| - tcp_info = socket.getsockopt(Socket::SOL_TCP, Socket::TCP_INFO).inspect - tcp_match = tcp_info.match(UNACKED_REGEXP) + # That's why pulling the `unacked` field by parsing + # `inspect` output is one of the ways to retrieve it. + # + def parse_with_inspect(tcp_info) + tcp_match = tcp_info.inspect.match(UNACKED_REGEXP) - tcp_match[:unacked].to_i - end + return 0 if tcp_match.nil? + + tcp_match[:unacked].to_i end - def metrics - { - "sockets.backlog" => unacked - } + # The above inspect data might not be available everywhere (looking at you + # AWS Fargate Host running on kernel 4.14!), but we might still recover it + # by manually unpacking the binary data based on linux headers. For example + # below is tcp info struct from `linux/tcp.h` header file, from problematic + # host rocking kernel 4.14. + # + # struct tcp_info { + # __u8 tcpi_state; + # __u8 tcpi_ca_state; + # __u8 tcpi_retransmits; + # __u8 tcpi_probes; + # __u8 tcpi_backoff; + # __u8 tcpi_options; + # __u8 tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4; + # __u8 tcpi_delivery_rate_app_limited:1; + # + # __u32 tcpi_rto; + # __u32 tcpi_ato; + # __u32 tcpi_snd_mss; + # __u32 tcpi_rcv_mss; + # + # __u32 tcpi_unacked; + # __u32 tcpi_sacked; + # __u32 tcpi_lost; + # __u32 tcpi_retrans; + # __u32 tcpi_fackets; + # + # /* Times. */ + # __u32 tcpi_last_data_sent; + # __u32 tcpi_last_ack_sent; /* Not remembered, sorry. */ + # __u32 tcpi_last_data_recv; + # __u32 tcpi_last_ack_recv; + # + # /* Metrics. */ + # __u32 tcpi_pmtu; + # __u32 tcpi_rcv_ssthresh; + # __u32 tcpi_rtt; + # __u32 tcpi_rttvar; + # __u32 tcpi_snd_ssthresh; + # __u32 tcpi_snd_cwnd; + # __u32 tcpi_advmss; + # __u32 tcpi_reordering; + # + # __u32 tcpi_rcv_rtt; + # __u32 tcpi_rcv_space; + # + # __u32 tcpi_total_retrans; + # + # __u64 tcpi_pacing_rate; + # __u64 tcpi_max_pacing_rate; + # __u64 tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */ + # __u64 tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */ + # __u32 tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */ + # __u32 tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */ + # + # __u32 tcpi_notsent_bytes; + # __u32 tcpi_min_rtt; + # __u32 tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */ + # __u32 tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */ + # + # __u64 tcpi_delivery_rate; + # + # __u64 tcpi_busy_time; /* Time (usec) busy sending data */ + # __u64 tcpi_rwnd_limited; /* Time (usec) limited by receive window */ + # __u64 tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */ + # }; + # + # Now nowing types and order of fields we can easily parse binary data + # by using + # - `C` flag for `__u8` type - 8-bit unsigned (unsigned char) + # - `L` flag for `__u32` type - 32-bit unsigned, native endian (uint32_t) + # - `Q` flag for `__u64` type - 64-bit unsigned, native endian (uint64_t) + # + # Complete `unpack` would look like `C8 L24 Q4 L6 Q4`, but we are only + # interested in `unacked` field at the moment, that's why we only parse + # till this field by unpacking with `C8 L5`. + # + # If you find that it's not giving correct results, then please fall back + # to inspect, or update this code to accept unpack sequence. But in the + # end unpack is preferable, as it's 12x faster than inspect. + # + # Tested against: + # - Amazon Linux 2 with kernel 4.14 & 5.10 + # - Ubuntu 20.04 with kernel 5.13 + # + def parse_with_unpack(tcp_info) + tcp_info.unpack("C8L5").last end end end diff --git a/lib/puma/plugin/telemetry/version.rb b/lib/puma/plugin/telemetry/version.rb index 9a06921..0d0ccf0 100644 --- a/lib/puma/plugin/telemetry/version.rb +++ b/lib/puma/plugin/telemetry/version.rb @@ -3,7 +3,7 @@ module Puma class Plugin module Telemetry - VERSION = "1.1.0.alpha" + VERSION = "1.1.0.beta" end end end diff --git a/spec/fixtures/config.rb b/spec/fixtures/config.rb index f53539e..2858a2d 100644 --- a/spec/fixtures/config.rb +++ b/spec/fixtures/config.rb @@ -4,6 +4,9 @@ lowlevel_error_handler { |_err| [500, {}, ["error page"]] } threads 1, 1 + +bind "unix://#{ENV["BIND_PATH"]}" + plugin "telemetry" Target = Struct.new(:name) do diff --git a/spec/fixtures/default.rb b/spec/fixtures/default.rb index ce06ec2..68892ba 100644 --- a/spec/fixtures/default.rb +++ b/spec/fixtures/default.rb @@ -4,6 +4,9 @@ lowlevel_error_handler { |_err| [500, {}, ["error page"]] } threads 1, 1 + +bind "unix://#{ENV["BIND_PATH"]}" + plugin "telemetry" Puma::Plugin::Telemetry.configure do |config| diff --git a/spec/fixtures/dogstatsd.rb b/spec/fixtures/dogstatsd.rb index 802f872..8463bc6 100644 --- a/spec/fixtures/dogstatsd.rb +++ b/spec/fixtures/dogstatsd.rb @@ -4,6 +4,9 @@ lowlevel_error_handler { |_err| [500, {}, ["error page"]] } threads 1, 1 + +bind "unix://#{ENV["BIND_PATH"]}" + plugin "telemetry" require "datadog/statsd" diff --git a/spec/fixtures/puma_telemetry_subset.rb b/spec/fixtures/puma_telemetry_subset.rb index f4a9231..11c9bde 100644 --- a/spec/fixtures/puma_telemetry_subset.rb +++ b/spec/fixtures/puma_telemetry_subset.rb @@ -5,6 +5,9 @@ threads 1, 2 workers 2 + +bind "unix://#{ENV["BIND_PATH"]}" + plugin "telemetry" Puma::Plugin::Telemetry.configure do |config| diff --git a/spec/fixtures/sockets.rb b/spec/fixtures/sockets.rb new file mode 100644 index 0000000..cda0e71 --- /dev/null +++ b/spec/fixtures/sockets.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +@initial_delay = true + +app do |_env| + sleep(2) if @initial_delay + + # there's only 1 thread, so it should be fine + @initial_delay = false + + [200, {}, ["embedded app"]] +end + +lowlevel_error_handler { |_err| [500, {}, ["error page"]] } + +threads 1, 1 +plugin "telemetry" + +bind "unix://#{ENV["BIND_PATH"]}" +bind "tcp://localhost:59292" + +Puma::Plugin::Telemetry.configure do |config| + # Simple `key=value` formatter + config.add_target :io, formatter: ->(t) { t.map { |r| r.join("=") }.join(" ") } + config.frequency = 1 + config.enabled = true + + # Check how `queue.backlog` from puma behaves + config.puma_telemetry = ["queue.backlog"] + + # Delay first metric, so puma has time to bootup workers + config.initial_delay = 2 + + config.socket_telemetry! +end diff --git a/spec/integration/plugin_spec.rb b/spec/integration/plugin_spec.rb index ca08b99..17dffd3 100644 --- a/spec/integration/plugin_spec.rb +++ b/spec/integration/plugin_spec.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "timeout" +require "net/http" TestTakesTooLongError = Class.new(StandardError) @@ -93,6 +94,43 @@ class Plugin expect(lines).to eq(expected_telemetry) end end + + context "when sockets telemetry" do + let(:config) { "sockets" } + + def make_request + Thread.new do + Net::HTTP.get_response(URI("http://127.0.0.1:59292/")) + end + end + + it "logs socket telemetry" do + threads = Array.new(2) { make_request } + + sleep 0.1 + + threads += Array.new(5) { make_request } + + true while (line = @server.next_line) !~ /sockets.backlog/ + + line.strip! + + # either "queue.backlog=1 sockets.backlog=5" + # or "queue.backlog=0 sockets.backlog=6" + # + # depending on whenever the first 2 requests are + # pulled at the same time by Puma from backlog + possible_lines = ["queue.backlog=1 sockets.backlog=5", + "queue.backlog=0 sockets.backlog=6"] + + expect(possible_lines.include?(line)).to eq(true) + + total = line.split.sum { |kv| kv.split("=").last.to_i } + expect(total).to eq 6 + + threads.each(&:join) + end + end end end end diff --git a/spec/support/server.rb b/spec/support/server.rb index eaf428a..ee7f57e 100644 --- a/spec/support/server.rb +++ b/spec/support/server.rb @@ -9,13 +9,19 @@ def initialize(config = "config") end def start - @server = IO.popen("PUMA_DEBUG=1 bundle exec puma -C spec/fixtures/#{@config}.rb -b unix://#{bind_path}", "r") + @server = IO.popen("BIND_PATH=#{bind_path} bundle exec puma -C spec/fixtures/#{@config}.rb -v --debug", "r") + @server_pid = @server.pid + + true while next_line !~ /PID:\ / + @puma_pid = @lines.last.split(": ").last.to_i + true while next_line !~ /Ctrl-C/ - @pid = @server.pid end def stop - stop_server + stop_server(@puma_pid) + stop_server(@server_pid) + cleanup_bindfile return if @server.nil? || @server&.closed? @@ -30,27 +36,27 @@ def next_line @lines.last end - private - def bind_path @bind_path ||= Tempfile.create(["", ".sock"], &:path) end + private + def cleanup_bindfile File.unlink(@bind_path) rescue Errno::ENOENT # rubocop:disable Lint/SuppressedException end - def stop_server - return if @pid.nil? + def stop_server(pid) + return if pid.nil? begin - Process.kill :TERM, @pid + Process.kill :KILL, pid rescue Errno::ESRCH # rubocop:disable Lint/SuppressedException end begin - Process.wait2 @pid + Process.wait2 pid rescue Errno::ECHILD # rubocop:disable Lint/SuppressedException end end