Skip to content

Commit

Permalink
proxy: fixes for memory tracking
Browse files Browse the repository at this point in the history
- fix ability to disable buffer memory limiter.
- fix memory accounting leak for ascii multigets

In the ascii multiget case it was over-accounting for the "END\r\n"
marker, leaking 5 bytes per key read.
  • Loading branch information
dormando committed Jun 15, 2023
1 parent 2315e0c commit ed875c5
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 221 deletions.
3 changes: 2 additions & 1 deletion proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,13 @@ typedef struct {
char *buf; // response line + potentially value.
mc_resp *cresp; // client mc_resp object during extstore fetches.
LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup.
size_t blen; // total size of the value to read.
unsigned int blen; // total size of the value to read.
struct timeval start; // time this object was created.
long elapsed; // time elapsed once handled.
int status; // status code from mcmc_read()
int bread; // amount of bytes read into value so far.
uint8_t cmd; // from parser (pr.command)
uint8_t extra; // ascii multiget hack for memory accounting. extra blen.
enum mcp_resp_mode mode; // reply mode (for noreply fixing)
char be_name[MAX_NAMELEN+1];
char be_port[MAX_PORTLEN+1];
Expand Down
5 changes: 2 additions & 3 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ static int mcplib_response_gc(lua_State *L) {
// FIXME: we handle the accounting here, but the actual response buffer is
// freed elsewhere, after the network write.
pthread_mutex_lock(&t->proxy_limit_lock);
t->proxy_buffer_memory_used -= r->blen;
t->proxy_buffer_memory_used -= r->blen + r->extra;
pthread_mutex_unlock(&t->proxy_limit_lock);

// On error/similar we might be holding the read buffer.
Expand Down Expand Up @@ -891,9 +891,8 @@ static int mcplib_buffer_memory_limit(lua_State *L) {
if (limit > tcount * 2) {
limit /= tcount;
}

ctx->buffer_memory_limit = limit;
}
ctx->buffer_memory_limit = limit;

return 0;
}
Expand Down
2 changes: 2 additions & 0 deletions proxy_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ static int proxy_backend_drive_machine(mcp_backend_t *be) {
// markers down here.
memcpy(r->buf+r->blen, ENDSTR, ENDLEN);
r->blen += 5;
} else {
r->extra = 5;
}

// advance buffer
Expand Down
51 changes: 0 additions & 51 deletions t/proxyconfig.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,57 +39,6 @@ function mcp_config_pools(old)
test = mcp.pool({b1, b2, b3}, { iothread = false })
}
return pools
elseif mode == "reqlimit" then
mcp.active_req_limit(4)
local b1 = mcp.backend('b1', '127.0.0.1', 11511)
local b2 = mcp.backend('b2', '127.0.0.1', 11512)
local b3 = mcp.backend('b3', '127.0.0.1', 11513)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif mode == "noreqlimit" then
mcp.active_req_limit(0)
local b1 = mcp.backend('b1', '127.0.0.1', 11511)
local b2 = mcp.backend('b2', '127.0.0.1', 11512)
local b3 = mcp.backend('b3', '127.0.0.1', 11513)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif mode == "buflimit" or mode == "buflimit2" then
mcp.buffer_memory_limit(20)
if mode == "buflimit2" then
mcp.buffer_memory_limit(200)
end
local b1 = mcp.backend('b1', '127.0.0.1', 11511)
local b2 = mcp.backend('b2', '127.0.0.1', 11512)
local b3 = mcp.backend('b3', '127.0.0.1', 11513)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif mode == "nobuflimit" then
mcp.buffer_memory_limit(0)
local b1 = mcp.backend('b1', '127.0.0.1', 11511)
local b2 = mcp.backend('b2', '127.0.0.1', 11512)
local b3 = mcp.backend('b3', '127.0.0.1', 11513)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
end
end

Expand Down
166 changes: 0 additions & 166 deletions t/proxyconfig.t
Original file line number Diff line number Diff line change
Expand Up @@ -315,175 +315,9 @@ is(<$watcher>, "OK\r\n", "watcher enabled");
check_version($ps);
}

###
# diag "starting proxy again from scratch"
###

# TODO: probably time to abstract the entire "start the server with mocked
# listeners" routine.
$watcher = undef;
write_modefile('return "reqlimit"');
$p_srv->stop;
while (1) {
if ($p_srv->is_running) {
sleep 1;
} else {
ok(!$p_srv->is_running, "stopped proxy");
last;
}
}

@mocksrvs = ();
# re-create the mock servers so we get clean connects, the previous
# backends could be reconnecting still.
for my $port (11511, 11512, 11513) {
my $srv = mock_server($port);
ok(defined $srv, "mock server created");
push(@mocksrvs, $srv);
}

# Start up a clean server.
# Since limits are per worker thread, cut the worker threads down to 1 to ease
# testing.
$p_srv = new_memcached('-o proxy_config=./t/proxyconfig.lua -t 1');
$ps = $p_srv->sock;
$ps->autoflush(1);

{
for my $msrv ($mocksrvs[0], $mocksrvs[1], $mocksrvs[2]) {
my $be = accept_backend($msrv);
push(@mbe, $be);
}

my $stats = mem_stats($ps, 'proxy');
isnt($stats->{active_req_limit}, 0, "active request limit is set");

# active request limit is 4, pipeline 6 requests and ensure the last two
# get junked
my $cmd = '';
for ('a', 'b', 'c', 'd', 'e', 'f') {
$cmd .= "mg /test/$_\r\n";
}
print $ps $cmd;

# Lua config only sends commands to the first backend for this test.
my $be = $mbe[0];
for (1 .. 4) {
like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
print $be "EN\r\n";
}
my $s = IO::Select->new();
$s->add($be);
my @readable = $s->can_read(0.25);
is(scalar @readable, 0, "no more pending reads on backend");

for (1 .. 4) {
is(scalar <$ps>, "EN\r\n", "received miss from backend");
}

is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got error back");
is(scalar <$ps>, "SERVER_ERROR active request limit reached\r\n", "got two limit errors");

# Test turning the limit back off.
write_modefile('return "noreqlimit"');
$watcher = $p_srv->new_sock;
print $watcher "watch proxyevents\n";
is(<$watcher>, "OK\r\n", "watcher enabled");
$p_srv->reload();
wait_reload($watcher);

$stats = mem_stats($ps, 'proxy');
is($stats->{active_req_limit}, 0, "active request limit unset");

$cmd = '';
for ('a', 'b', 'c', 'd', 'e', 'f') {
$cmd .= "mg /test/$_\r\n";
}
print $ps $cmd;
for (1 .. 6) {
like(<$be>, qr/^mg \/test\/\w\r\n$/, "backend received mg");
print $be "EN\r\n";
}
for (1 .. 6) {
is(scalar <$ps>, "EN\r\n", "received miss from backend");
}
}

{
# Test the buffer memory limiter.
# - limit per worker will be 1/t global limit
write_modefile('return "buflimit"');
$p_srv->reload();
wait_reload($watcher);
# Get a secondary client to trample limit.
my $sps = $p_srv->new_sock;

my $stats = mem_stats($ps, 'proxy');
isnt($stats->{buffer_memory_limit}, 0, "buf mem limit is set");

# - test SET commands with values, but nothing being read on backend
my $data = 'x' x 30000;
my $cmd = "ms foo 30000 T30\r\n" . $data . "\r\n";
print $ps $cmd;

my $be = $mbe[0];
my $s = IO::Select->new;
$s->add($be);
# Wait until the backend has the request queued, then send the second one.
my @readable = $s->can_read(1);
print $sps $cmd;

my $res;
is(scalar <$be>, "ms foo 30000 T30\r\n", "received first ms");
$res = scalar <$be>;
print $be "HD\r\n";

# The second request should have been caught by the memory limiter
is(scalar <$sps>, "SERVER_ERROR out of memory\r\n", "got server error");
# FIXME: The original response cannot succeed because we cannot allocate
# enough memory to read the response from the backend.
# This is conveniently testing both paths right here but I would prefer
# something better.
# TODO: need to see if it's possible to surface an OOM from the backend
# handler, but that requires more rewiring.
is(scalar <$ps>, "SERVER_ERROR backend failure\r\n", "first request succeeded");

# Backend gets killed from a read OOM, so we need to re-establish.
$be = $mbe[0] = accept_backend($mocksrvs[0]);
like(<$watcher>, qr/error=outofmemory/, "OOM log line");

# Memory limits won't drop until the garbage collectors run, which
# requires a bit more push, so instead we raise the limits here so we can
# retry from the pre-existing connections to test swallow mode.
write_modefile('return "buflimit2"');
$p_srv->reload();
wait_reload($watcher);

# Test sending another request down both pipes to ensure they still work.
$cmd = "ms foo 2 T30\r\nhi\r\n";
print $ps $cmd;
is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
is(scalar <$be>, "hi\r\n", "client works after oom");
print $be "HD\r\n";
is(scalar <$ps>, "HD\r\n", "client received resp after oom");
print $sps $cmd;
is(scalar <$be>, "ms foo 2 T30\r\n", "client works after oom");
is(scalar <$be>, "hi\r\n", "client works after oom");
print $be "HD\r\n";
is(scalar <$sps>, "HD\r\n", "client received resp after oom");

# - test GET commands but don't read back, large backend values
# - test disabling the limiter
# extended testing:
# - create errors while holding the buffers?
}

# TODO:
# check reqlimit/bwlimit counters
# remove backends
# do dead sockets close?
# adding user stats
# changing user stats
# adding backends with the same label don't create more connections
# total backend counters
# change top level routes mid-request
Expand Down
80 changes: 80 additions & 0 deletions t/proxylimits.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
-- need to use a global counter to avoid losing it on reload.
-- not really sure this'll work forever, but even if it doesn't I should allow
-- some method of persisting data across reloads.
if reload_count == nil then
reload_count = 0
end

function mcp_config_pools(old)
mcp.backend_read_timeout(4)
mcp.backend_connect_timeout(5)
reload_count = reload_count + 1

if reload_count == 1 then
-- set a low request limit.
mcp.active_req_limit(4)
local b1 = mcp.backend('b1', '127.0.0.1', 11711)
local b2 = mcp.backend('b2', '127.0.0.1', 11712)
local b3 = mcp.backend('b3', '127.0.0.1', 11713)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif reload_count == 2 then
-- removing the request limit.
mcp.active_req_limit(0)
local b1 = mcp.backend('b1', '127.0.0.1', 11711)
local b2 = mcp.backend('b2', '127.0.0.1', 11712)
local b3 = mcp.backend('b3', '127.0.0.1', 11713)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif reload_count == 3 or reload_count == 4 then
-- adding the memory buffer limit (abusrdly low)
mcp.buffer_memory_limit(20)
if reload_count == 4 then
-- raise it a bit but still limited.
mcp.buffer_memory_limit(200)
end
local b1 = mcp.backend('b1', '127.0.0.1', 11711)
local b2 = mcp.backend('b2', '127.0.0.1', 11712)
local b3 = mcp.backend('b3', '127.0.0.1', 11713)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
elseif reload_count == 5 then
-- remove the buffer limit entirely.
mcp.buffer_memory_limit(0)
local b1 = mcp.backend('b1', '127.0.0.1', 11711)
local b2 = mcp.backend('b2', '127.0.0.1', 11712)
local b3 = mcp.backend('b3', '127.0.0.1', 11713)

-- Direct all traffic at a single backend to simplify the test.
local pools = {
test = mcp.pool({b1}),
hold = mcp.pool({b2, b3})
}
return pools
end
end

-- At least to start we don't need to test every command, but we should do
-- some tests against the two broad types of commands (gets vs sets with
-- payloads)
function mcp_config_routes(zones)
mcp.attach(mcp.CMD_MG, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_MS, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_SET, function(r) return zones["test"](r) end)
mcp.attach(mcp.CMD_GET, function(r) return zones["test"](r) end)
end
Loading

0 comments on commit ed875c5

Please sign in to comment.