Skip to content

Commit

Permalink
core: allow forcing protocol per listener socket
Browse files Browse the repository at this point in the history
-l proto[ascii]:127.0.0.1:11211

accepts:

- ascii
- binary
- negotiating
- proxy

Allows running proxy on default listeners but direct to memcached on a
specific port, or binary and ascii on different ports, or etc.
  • Loading branch information
dormando committed Aug 25, 2022
1 parent 6bcc5b8 commit 4c919bd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 14 deletions.
64 changes: 54 additions & 10 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ void conn_io_queue_return(io_pending_t *io) {
conn *conn_new(const int sfd, enum conn_states init_state,
const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base, void *ssl, uint64_t conntag) {
struct event_base *base, void *ssl, uint64_t conntag,
enum protocol bproto) {
conn *c;

assert(sfd >= 0 && sfd < max_fds);
Expand Down Expand Up @@ -693,7 +694,7 @@ conn *conn_new(const int sfd, enum conn_states init_state,
}

c->transport = transport;
c->protocol = settings.binding_protocol;
c->protocol = bproto;
c->tag = conntag;

/* unix socket mode doesn't need this, so zeroed out. but why
Expand Down Expand Up @@ -3073,7 +3074,7 @@ static void drive_machine(conn *c) {
#endif

dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
READ_BUFFER_CACHED, c->transport, ssl_v, c->tag);
READ_BUFFER_CACHED, c->transport, ssl_v, c->tag, c->protocol);
}

stop = true;
Expand Down Expand Up @@ -3474,7 +3475,8 @@ static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file, bool ssl_enabled,
uint64_t conntag) {
uint64_t conntag,
enum protocol bproto) {
int sfd;
struct linger ling = {0, 0};
struct addrinfo *ai;
Expand Down Expand Up @@ -3616,12 +3618,12 @@ static int server_socket(const char *interface,
}
dispatch_conn_new(per_thread_fd, conn_read,
EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport, NULL, conntag);
UDP_READ_BUFFER_SIZE, transport, NULL, conntag, bproto);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base, NULL, conntag))) {
transport, main_base, NULL, conntag, bproto))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -3652,7 +3654,7 @@ static int server_sockets(int port, enum network_transport transport,
#endif

if (settings.inter == NULL) {
return server_socket(settings.inter, port, transport, portnumber_file, ssl_enabled, conntag);
return server_socket(settings.inter, port, transport, portnumber_file, ssl_enabled, conntag, settings.binding_protocol);
} else {
// tokenize them and bind to each one of them..
char *b;
Expand Down Expand Up @@ -3683,8 +3685,50 @@ static int server_sockets(int port, enum network_transport transport,
}
#endif

// Allow forcing the protocol of this listener.
const char *protostr = "proto";
enum protocol bproto = settings.binding_protocol;
if (strncmp(p, protostr, strlen(protostr)) == 0) {
p += strlen(protostr);
if (*p == '[') {
char *e = strchr(p, ']');
if (e == NULL) {
fprintf(stderr, "Invalid protocol spec: \"%s\"\n", p);
free(list);
return 1;
}
char *st = ++p; // skip '[';
*e = '\0';
size_t len = e - st;
p = ++e; // skip ']'
p++; // skip an assumed ':'

if (strncmp(st, "ascii", len) == 0) {
bproto = ascii_prot;
} else if (strncmp(st, "binary", len) == 0) {
bproto = binary_prot;
} else if (strncmp(st, "negotiating", len) == 0) {
bproto = negotiating_prot;
} else if (strncmp(st, "proxy", len) == 0) {
#ifdef PROXY
if (settings.proxy_enabled) {
bproto = proxy_prot;
} else {
fprintf(stderr, "Proxy must be enabled to use: \"%s\"\n", list);
free(list);
return 1;
}
#else
fprintf(stderr, "Server not built with proxy: \"%s\"\n", list);
free(list);
return 1;
#endif
}
}
}

const char *tagstr = "tag";
if (strncmp(p, "tag", strlen(tagstr)) == 0) {
if (strncmp(p, tagstr, strlen(tagstr)) == 0) {
p += strlen(tagstr);
if (*p == '[') {
char *e = strchr(p, ']');
Expand Down Expand Up @@ -3750,7 +3794,7 @@ static int server_sockets(int port, enum network_transport transport,
if (strcmp(p, "*") == 0) {
p = NULL;
}
ret |= server_socket(p, the_port, transport, portnumber_file, ssl_enabled, conntag);
ret |= server_socket(p, the_port, transport, portnumber_file, ssl_enabled, conntag, bproto);
if (ret != 0 && errno_save == 0) errno_save = errno;
}
free(list);
Expand Down Expand Up @@ -3830,7 +3874,7 @@ static int server_socket_unix(const char *path, int access_mask) {
}
if (!(listen_conn = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
local_transport, main_base, NULL, 0))) {
local_transport, main_base, NULL, 0, settings.binding_protocol))) {
fprintf(stderr, "failed to create listening connection\n");
exit(EXIT_FAILURE);
}
Expand Down
4 changes: 2 additions & 2 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ io_queue_t *conn_io_queue_get(conn *c, int type);
io_queue_cb_t *thread_io_queue_get(LIBEVENT_THREAD *t, int type);
void conn_io_queue_return(io_pending_t *io);
conn *conn_new(const int sfd, const enum conn_states init_state, const int event_flags, const int read_buffer_size,
enum network_transport transport, struct event_base *base, void *ssl, uint64_t conntag);
enum network_transport transport, struct event_base *base, void *ssl, uint64_t conntag, enum protocol bproto);

void conn_worker_readd(conn *c);
extern int daemonize(int nochdir, int noclose);
Expand Down Expand Up @@ -946,7 +946,7 @@ void proxy_reload_notify(LIBEVENT_THREAD *t);
#endif
void return_io_pending(io_pending_t *io);
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, int read_buffer_size,
enum network_transport transport, void *ssl, uint64_t conntag);
enum network_transport transport, void *ssl, uint64_t conntag, enum protocol bproto);
void sidethread_conn_close(conn *c);

/* Lock wrappers for cache functions that are called from main loop. */
Expand Down
6 changes: 4 additions & 2 deletions thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ struct conn_queue_item {
conn *c;
void *ssl;
uint64_t conntag;
enum protocol bproto;
io_pending_t *io; // IO when used for deferred IO handling.
STAILQ_ENTRY(conn_queue_item) i_next;
};
Expand Down Expand Up @@ -568,7 +569,7 @@ static void thread_libevent_process(evutil_socket_t fd, short which, void *arg)
case queue_new_conn:
c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base, item->ssl, item->conntag);
me->base, item->ssl, item->conntag, item->bproto);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
Expand Down Expand Up @@ -715,7 +716,7 @@ static LIBEVENT_THREAD *select_thread_by_napi_id(int sfd)
*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
int read_buffer_size, enum network_transport transport, void *ssl,
uint64_t conntag) {
uint64_t conntag, enum protocol bproto) {
CQ_ITEM *item = NULL;
LIBEVENT_THREAD *thread;

Expand All @@ -740,6 +741,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->mode = queue_new_conn;
item->ssl = ssl;
item->conntag = conntag;
item->bproto = bproto;

MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);
notify_worker(thread, item);
Expand Down

0 comments on commit 4c919bd

Please sign in to comment.