Skip to content

Commit

Permalink
rpc: limited multithread support for svc_nl
Browse files Browse the repository at this point in the history
The rpc(3) itself was not designed with multithreading in mind, but we can
actually achieve some parallelism without modifying the library and the
framework.  This transport will allow to process RPCs in threads, with
some hacks on the application side (documented in code).  We make
reentrable only one method - SVC_REPLY().  Reading and parsing of incoming
calls is still done synchronously.  But the actual processing of the calls
can be offloaded to a thread, and once finished the thread can safely
execute svc_sendreply() and the reply would be sent with the correct xid.

Differential Revision:	https://reviews.freebsd.org/D48569
  • Loading branch information
glebius committed Feb 1, 2025
1 parent 75a884f commit c62ae12
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 14 deletions.
7 changes: 7 additions & 0 deletions include/rpc/svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,13 @@ extern SVCXPRT *svcunixfd_create(int, u_int, u_int);
*/
extern SVCXPRT *svc_nl_create(const char *);

/*
* Arguments to SVC_CONTROL(svc_nl)
*/
enum {
SVCNL_GET_XIDKEY = 1, /* obtain pthread specific key for xid */
};

/*
* Memory based rpc (for speed check and testing)
*/
Expand Down
100 changes: 86 additions & 14 deletions lib/libc/rpc/svc_nl.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include <rpc/rpc.h>
#include <rpc/clnt_nl.h>

Expand All @@ -37,6 +38,7 @@
#include <netlink/netlink_snl_generic.h>

#include "rpc_com.h"
#include "libc_private.h"

/*
* RPC server to serve a kernel RPC client(s) over netlink(4). See clnt_nl.c
Expand All @@ -54,6 +56,7 @@ static bool_t svc_nl_reply(SVCXPRT *, struct rpc_msg *);
static enum xprt_stat svc_nl_stat(SVCXPRT *);
static bool_t svc_nl_getargs(SVCXPRT *, xdrproc_t, void *);
static bool_t svc_nl_freeargs(SVCXPRT *, xdrproc_t, void *);
static bool_t svc_nl_control(SVCXPRT *, const u_int, void *);

static struct xp_ops nl_ops = {
.xp_recv = svc_nl_recv,
Expand All @@ -63,11 +66,15 @@ static struct xp_ops nl_ops = {
.xp_freeargs = svc_nl_freeargs,
.xp_destroy = svc_nl_destroy,
};
static struct xp_ops2 nl_ops2 = {
.xp_control = svc_nl_control,
};

struct nl_softc {
struct snl_state snl;
XDR xdrs;
struct nlmsghdr *hdr;
pthread_key_t xidkey;
size_t mlen;
enum xprt_stat stat;
uint32_t xid;
Expand Down Expand Up @@ -108,9 +115,15 @@ svc_nl_create(const char *service)
sc->stat = XPRT_IDLE,
sc->family = family;

if (__isthreaded &&
(pthread_key_create(&sc->xidkey, NULL) != 0 ||
pthread_setspecific(sc->xidkey, &sc->xid) != 0))
goto fail;

xprt->xp_fd = sc->snl.fd,
xprt->xp_p1 = sc,
xprt->xp_ops = &nl_ops,
xprt->xp_ops2 = &nl_ops2,
xprt->xp_rtaddr = (struct netbuf){
.maxlen = sizeof(struct sockaddr_nl),
.len = sizeof(struct sockaddr_nl),
Expand Down Expand Up @@ -208,26 +221,62 @@ svc_nl_recv(SVCXPRT *xprt, struct rpc_msg *msg)
return (FALSE);
}

/*
* Reenterable reply method. Note that both the softc and xprt are declared
* const. The qualifier for xprt is commented out to match the library
* prototype. If doing any substantial changes to the function please
* temporarily uncomment the const for xprt and check your changes.
*
* Applications that want to use svc_nl_reply in a spawned thread context
* should do the following hacks in self:
* 1) - Create xprt with svc_nl_create() with libc in threaded mode, e.g.
* at least one pthread_create() shall happen before svc_nl_create().
* - After xprt creation query it for the pthread_key_t with the
* SVCNL_GET_XIDKEY control and save this key.
* 2) In the RPC function body that wants to become multithreaded:
* - Make a copy of the arguments and of the xid with help of
* pthread_getspecific() using the key.
* - pthread_create() the worker function, pointing it at the copy of
* arguments and xid.
* - return FALSE, so that RPC generated code doesn't do anything.
* 3) In the spawned thread:
* - Use arguments provided in the copy by the parent.
* - Allocate appropriately typed result on stack.
* - *** do the actual work ***
* - Populate the on-stack result same way as pointed result is populated
* in a regular RPC function.
* - Point the thread specific storage to the copy of xid provided by the
* parent with help of pthread_setspecific().
* - Call svc_sendreply() just like the rpcgen(1) generated code does.
*
* If all done correctly svc_nl_reply() will use thread specific xid for
* a call that was processed asynchronously and most recent xid when entered
* synchronously. So you can make only some methods of your application
* reentrable, keeping others as is.
*/

static bool_t
svc_nl_reply(SVCXPRT *xprt, struct rpc_msg *msg)
svc_nl_reply(/* const */ SVCXPRT *xprt, struct rpc_msg *msg)
{
struct nl_softc *sc = xprt->xp_p1;
const struct nl_softc *sc = xprt->xp_p1;
struct snl_state snl;
struct snl_writer nw;
XDR xdrs;
struct nlattr *body;
bool_t rv;

msg->rm_xid = sc->xid;
msg->rm_xid = __isthreaded ?
*(uint32_t *)pthread_getspecific(sc->xidkey) :
sc->xid;

if (__predict_false(!snl_clone(&snl, &sc->snl)))
return (FALSE);
snl_init_writer(&sc->snl, &nw);
snl_init_writer(&snl, &nw);
snl_create_genl_msg_request(&nw, sc->family, RPCNL_REPLY);
snl_add_msg_attr_u32(&nw, RPCNL_REPLY_GROUP, sc->group);
body = snl_reserve_msg_attr_raw(&nw, RPCNL_REPLY_BODY, RPC_MAXDATASIZE);

xdrmem_create(&sc->xdrs, (char *)(body + 1), RPC_MAXDATASIZE,
XDR_ENCODE);
xdrmem_create(&xdrs, (char *)(body + 1), RPC_MAXDATASIZE, XDR_ENCODE);

if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
Expand All @@ -240,35 +289,58 @@ svc_nl_reply(SVCXPRT *xprt, struct rpc_msg *msg)
msg->acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
msg->acpted_rply.ar_results.where = NULL;

pos = xdr_getpos(&sc->xdrs);
if (!xdr_replymsg(&sc->xdrs, msg) ||
!SVCAUTH_WRAP(&SVC_AUTH(xprt), &sc->xdrs, xdr_proc,
pos = xdr_getpos(&xdrs);
if (!xdr_replymsg(&xdrs, msg) ||
!SVCAUTH_WRAP(&SVC_AUTH(xprt), &xdrs, xdr_proc,
xdr_where)) {
xdr_setpos(&sc->xdrs, pos);
xdr_setpos(&xdrs, pos);
rv = FALSE;
} else
rv = TRUE;
} else
rv = xdr_replymsg(&sc->xdrs, msg);
rv = xdr_replymsg(&xdrs, msg);

if (rv) {
/* snl_finalize_msg() really doesn't work for us */
body->nla_len = sizeof(struct nlattr) + xdr_getpos(&sc->xdrs);
body->nla_len = sizeof(struct nlattr) + xdr_getpos(&xdrs);
nw.hdr->nlmsg_len = ((char *)body - (char *)nw.hdr) +
body->nla_len;
nw.hdr->nlmsg_type = sc->family;
nw.hdr->nlmsg_flags = NLM_F_REQUEST;
nw.hdr->nlmsg_seq = sc->xid;
nw.hdr->nlmsg_seq = msg->rm_xid;
if (write(xprt->xp_fd, nw.hdr, nw.hdr->nlmsg_len) !=
nw.hdr->nlmsg_len)
DIE(sc);
DIE(__DECONST(struct nl_softc *, sc));
}

snl_free(&snl);

return (rv);
}

static bool_t
svc_nl_control(SVCXPRT *xprt, const u_int req, void *v)
{
struct nl_softc *sc = xprt->xp_p1;

switch (req) {
case SVCNL_GET_XIDKEY:
if (!__isthreaded) {
/*
* Report to application that it had created xprt not
* in threaded mode, but definitly plans to use it with
* threads. If it tries so, it would very likely crash.
*/
errno = EDOOFUS;
DIE(sc);
};
*(pthread_key_t *)v = sc->xidkey;
return (TRUE);
default:
return (FALSE);
}
}

static enum xprt_stat
svc_nl_stat(SVCXPRT *xprt)
{
Expand Down

0 comments on commit c62ae12

Please sign in to comment.