Skip to content

Commit

Permalink
Merge pull request #12 from lessonnine/TNT-2741/parse-tcp-info
Browse files Browse the repository at this point in the history
[TNT-2741] Parse TCP INFO binary data
  • Loading branch information
Leszek Zalewski authored Jan 14, 2022
2 parents b2e5852 + e6a77b5 commit bf7648e
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
strategy:
matrix:
os: ['ubuntu-18.04', 'ubuntu-20.04']
ruby: ['2.6', '2.7', '3.0']
ruby: ['2.6', '2.7', '3.0', '3.1']

runs-on: ${{ matrix.os }}

Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [1.1.0 Beta]

### Added

Different ways to parse `Socket::Option`. Mainly due to the fact that `#inspect` can't
generate proper data on AWS Fargate, which runs Amazon Linux 2 with 4.14 kernel. So now
besides `#inspect` there's also `#unpack` that parses binary data and picks proper field.

It depends on the kernel, but new fields are usually added at the end of the `tcp_info`
struct, so it should more or less stay stable.

You can configure it by passing in `config.socket_parser = :inspect` or
`config.socket_parser = ->(opt) { your implementation }`.

## [1.1.0 Alpha]

### Added
Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
puma-plugin-telemetry (1.1.0.alpha)
puma-plugin-telemetry (1.1.0.beta)
puma (>= 5.0)

GEM
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ Puma::Plugin::Telemetry.configure do |config|
config.initial_delay = 10
config.frequency = 30
config.puma_telemetry = %w[workers.requests_count queue.backlog queue.capacity]
config.socket_telemetry!
config.socket_parser = :inspect
config.add_target :io, formatter: :json, io: StringIO.new
config.add_target :dogstatsd, client: Datadog::Statsd.new(tags: { env: ENV["RAILS_ENV"] })
end
Expand Down
2 changes: 1 addition & 1 deletion lib/puma/plugin/telemetry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def socket_telemetry(telemetry, launcher)
return telemetry if launcher.nil?
return telemetry unless config.socket_telemetry?

telemetry.merge! SocketData.new(launcher.binder.ios)
telemetry.merge! SocketData.new(launcher.binder.ios, config.socket_parser)
.metrics

telemetry
Expand Down
15 changes: 15 additions & 0 deletions lib/puma/plugin/telemetry/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,28 @@ class Config
# - default: false
attr_accessor :socket_telemetry

# Symbol representing method to parse the `Socket::Option`, or
# the whole implementation as a lambda. Available options:
# - `:inspect`, based on the `Socket::Option#inspect` method,
# it's the safest and slowest way to extract the info. `inspect`
# output might not be available, i.e. on AWS Fargate
# - `:unpack`, parse binary data given by `Socket::Option`. Fastest
# way (12x compared to `inspect`) but depends on kernel headers
# and fields ordering within the struct. It should almost always
# match though. DEFAULT
# - proc/lambda, `Socket::Option` will be given as an argument, it
# should return the value of `unacked` field as an integer.
#
attr_accessor :socket_parser

def initialize
@enabled = false
@initial_delay = 5
@frequency = 5
@targets = []
@puma_telemetry = DEFAULT_PUMA_TELEMETRY
@socket_telemetry = false
@socket_parser = :unpack
end

def enabled?
Expand Down
132 changes: 118 additions & 14 deletions lib/puma/plugin/telemetry/data.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,34 @@ def sum_stat(stat)
class SocketData
UNACKED_REGEXP = /\ unacked=(?<unacked>\d+)\ /.freeze

def initialize(ios)
@sockets = ios.select { |io| io.respond_to?(:getsockopt) }
def initialize(ios, parser)
@sockets = ios.select { |io| io.respond_to?(:getsockopt) && io.is_a?(TCPSocket) }
@parser =
case parser
when :inspect then method(:parse_with_inspect)
when :unpack then method(:parse_with_unpack)
when Proc then parser
end
end

# Number of unacknowledged connections in the sockets, which
# we know as socket backlog.
#
def unacked
@sockets.sum do |socket|
@parser.call(socket.getsockopt(Socket::SOL_TCP,
Socket::TCP_INFO))
end
end

def metrics
{
"sockets.backlog" => unacked
}
end

private

# The Socket::Option returned by `getsockopt` doesn't provide
# any kind of accessors for data inside. It decodes it on demand
# for `inspect` as strings in C implementation. It looks like
Expand Down Expand Up @@ -143,21 +164,104 @@ def initialize(ios)
# total_retrans=0
# (128 bytes too long)>
#
# That's why we have to pull the `unacked` field by parsing
# `inspect` output, instead of using something like `opt.unacked`
def unacked
@sockets.sum do |socket|
tcp_info = socket.getsockopt(Socket::SOL_TCP, Socket::TCP_INFO).inspect
tcp_match = tcp_info.match(UNACKED_REGEXP)
# That's why pulling the `unacked` field by parsing
# `inspect` output is one of the ways to retrieve it.
#
def parse_with_inspect(tcp_info)
tcp_match = tcp_info.inspect.match(UNACKED_REGEXP)

tcp_match[:unacked].to_i
end
return 0 if tcp_match.nil?

tcp_match[:unacked].to_i
end

def metrics
{
"sockets.backlog" => unacked
}
# The above inspect data might not be available everywhere (looking at you
# AWS Fargate Host running on kernel 4.14!), but we might still recover it
# by manually unpacking the binary data based on linux headers. For example
# below is tcp info struct from `linux/tcp.h` header file, from problematic
# host rocking kernel 4.14.
#
# struct tcp_info {
# __u8 tcpi_state;
# __u8 tcpi_ca_state;
# __u8 tcpi_retransmits;
# __u8 tcpi_probes;
# __u8 tcpi_backoff;
# __u8 tcpi_options;
# __u8 tcpi_snd_wscale : 4, tcpi_rcv_wscale : 4;
# __u8 tcpi_delivery_rate_app_limited:1;
#
# __u32 tcpi_rto;
# __u32 tcpi_ato;
# __u32 tcpi_snd_mss;
# __u32 tcpi_rcv_mss;
#
# __u32 tcpi_unacked;
# __u32 tcpi_sacked;
# __u32 tcpi_lost;
# __u32 tcpi_retrans;
# __u32 tcpi_fackets;
#
# /* Times. */
# __u32 tcpi_last_data_sent;
# __u32 tcpi_last_ack_sent; /* Not remembered, sorry. */
# __u32 tcpi_last_data_recv;
# __u32 tcpi_last_ack_recv;
#
# /* Metrics. */
# __u32 tcpi_pmtu;
# __u32 tcpi_rcv_ssthresh;
# __u32 tcpi_rtt;
# __u32 tcpi_rttvar;
# __u32 tcpi_snd_ssthresh;
# __u32 tcpi_snd_cwnd;
# __u32 tcpi_advmss;
# __u32 tcpi_reordering;
#
# __u32 tcpi_rcv_rtt;
# __u32 tcpi_rcv_space;
#
# __u32 tcpi_total_retrans;
#
# __u64 tcpi_pacing_rate;
# __u64 tcpi_max_pacing_rate;
# __u64 tcpi_bytes_acked; /* RFC4898 tcpEStatsAppHCThruOctetsAcked */
# __u64 tcpi_bytes_received; /* RFC4898 tcpEStatsAppHCThruOctetsReceived */
# __u32 tcpi_segs_out; /* RFC4898 tcpEStatsPerfSegsOut */
# __u32 tcpi_segs_in; /* RFC4898 tcpEStatsPerfSegsIn */
#
# __u32 tcpi_notsent_bytes;
# __u32 tcpi_min_rtt;
# __u32 tcpi_data_segs_in; /* RFC4898 tcpEStatsDataSegsIn */
# __u32 tcpi_data_segs_out; /* RFC4898 tcpEStatsDataSegsOut */
#
# __u64 tcpi_delivery_rate;
#
# __u64 tcpi_busy_time; /* Time (usec) busy sending data */
# __u64 tcpi_rwnd_limited; /* Time (usec) limited by receive window */
# __u64 tcpi_sndbuf_limited; /* Time (usec) limited by send buffer */
# };
#
# Now nowing types and order of fields we can easily parse binary data
# by using
# - `C` flag for `__u8` type - 8-bit unsigned (unsigned char)
# - `L` flag for `__u32` type - 32-bit unsigned, native endian (uint32_t)
# - `Q` flag for `__u64` type - 64-bit unsigned, native endian (uint64_t)
#
# Complete `unpack` would look like `C8 L24 Q4 L6 Q4`, but we are only
# interested in `unacked` field at the moment, that's why we only parse
# till this field by unpacking with `C8 L5`.
#
# If you find that it's not giving correct results, then please fall back
# to inspect, or update this code to accept unpack sequence. But in the
# end unpack is preferable, as it's 12x faster than inspect.
#
# Tested against:
# - Amazon Linux 2 with kernel 4.14 & 5.10
# - Ubuntu 20.04 with kernel 5.13
#
def parse_with_unpack(tcp_info)
tcp_info.unpack("C8L5").last
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/puma/plugin/telemetry/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Puma
class Plugin
module Telemetry
VERSION = "1.1.0.alpha"
VERSION = "1.1.0.beta"
end
end
end
3 changes: 3 additions & 0 deletions spec/fixtures/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
lowlevel_error_handler { |_err| [500, {}, ["error page"]] }

threads 1, 1

bind "unix://#{ENV["BIND_PATH"]}"

plugin "telemetry"

Target = Struct.new(:name) do
Expand Down
3 changes: 3 additions & 0 deletions spec/fixtures/default.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
lowlevel_error_handler { |_err| [500, {}, ["error page"]] }

threads 1, 1

bind "unix://#{ENV["BIND_PATH"]}"

plugin "telemetry"

Puma::Plugin::Telemetry.configure do |config|
Expand Down
3 changes: 3 additions & 0 deletions spec/fixtures/dogstatsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
lowlevel_error_handler { |_err| [500, {}, ["error page"]] }

threads 1, 1

bind "unix://#{ENV["BIND_PATH"]}"

plugin "telemetry"

require "datadog/statsd"
Expand Down
3 changes: 3 additions & 0 deletions spec/fixtures/puma_telemetry_subset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

threads 1, 2
workers 2

bind "unix://#{ENV["BIND_PATH"]}"

plugin "telemetry"

Puma::Plugin::Telemetry.configure do |config|
Expand Down
35 changes: 35 additions & 0 deletions spec/fixtures/sockets.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

@initial_delay = true

app do |_env|
sleep(2) if @initial_delay

# there's only 1 thread, so it should be fine
@initial_delay = false

[200, {}, ["embedded app"]]
end

lowlevel_error_handler { |_err| [500, {}, ["error page"]] }

threads 1, 1
plugin "telemetry"

bind "unix://#{ENV["BIND_PATH"]}"
bind "tcp://localhost:59292"

Puma::Plugin::Telemetry.configure do |config|
# Simple `key=value` formatter
config.add_target :io, formatter: ->(t) { t.map { |r| r.join("=") }.join(" ") }
config.frequency = 1
config.enabled = true

# Check how `queue.backlog` from puma behaves
config.puma_telemetry = ["queue.backlog"]

# Delay first metric, so puma has time to bootup workers
config.initial_delay = 2

config.socket_telemetry!
end
38 changes: 38 additions & 0 deletions spec/integration/plugin_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "timeout"
require "net/http"

TestTakesTooLongError = Class.new(StandardError)

Expand Down Expand Up @@ -93,6 +94,43 @@ class Plugin
expect(lines).to eq(expected_telemetry)
end
end

context "when sockets telemetry" do
let(:config) { "sockets" }

def make_request
Thread.new do
Net::HTTP.get_response(URI("http://127.0.0.1:59292/"))
end
end

it "logs socket telemetry" do
threads = Array.new(2) { make_request }

sleep 0.1

threads += Array.new(5) { make_request }

true while (line = @server.next_line) !~ /sockets.backlog/

line.strip!

# either "queue.backlog=1 sockets.backlog=5"
# or "queue.backlog=0 sockets.backlog=6"
#
# depending on whenever the first 2 requests are
# pulled at the same time by Puma from backlog
possible_lines = ["queue.backlog=1 sockets.backlog=5",
"queue.backlog=0 sockets.backlog=6"]

expect(possible_lines.include?(line)).to eq(true)

total = line.split.sum { |kv| kv.split("=").last.to_i }
expect(total).to eq 6

threads.each(&:join)
end
end
end
end
end
Loading

0 comments on commit bf7648e

Please sign in to comment.