Skip to content

Commit

Permalink
rework luv work vm storage
Browse files Browse the repository at this point in the history
moves work vm storage into a garbage collected userdata.
this allows for each lua state to manage its own storage.

this removes the need for a global array of work vms.
  • Loading branch information
truemedian authored and zhaozg committed Feb 1, 2025
1 parent 1b29585 commit 657a08f
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 65 deletions.
5 changes: 0 additions & 5 deletions src/luv.c
Original file line number Diff line number Diff line change
Expand Up @@ -852,11 +852,6 @@ static int loop_gc(lua_State *L) {
while (uv_loop_close(loop)) {
uv_run(loop, UV_RUN_DEFAULT);
}
/* do cleanup in main thread */
lua_getglobal(L, "_THREAD");
if (lua_isnil(L, -1))
luv_work_cleanup();
lua_pop(L, 1);
return 0;
}

Expand Down
129 changes: 69 additions & 60 deletions src/work.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,20 @@
*/
#include "private.h"

typedef struct {
lua_State** vms;
unsigned int nvms;
unsigned int idx_vms;
uv_mutex_t vm_mutex;
} luv_work_vms_t;

typedef struct {
lua_State* L; /* vm in main */
char* code; /* thread entry code */
size_t len;

int after_work_cb; /* ref, run in main ,call after work cb*/
luv_work_vms_t* vms; /* userdata owned by L, so parent thread can clean up old states */
} luv_work_ctx_t;

typedef struct {
Expand All @@ -35,16 +43,6 @@ typedef struct {

static uv_once_t once_vmkey = UV_ONCE_INIT;
static uv_key_t tls_vmkey; /* thread local storage key for Lua state */
static uv_mutex_t vm_mutex;

static unsigned int idx_vms = 0;
static unsigned int nvms = 0;
static lua_State** vms;
static lua_State* default_vms[4];

#ifndef ARRAY_SIZE
#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0]))
#endif

#if LUV_UV_VERSION_GEQ(1, 30, 0)
#define MAX_THREADPOOL_SIZE 1024
Expand Down Expand Up @@ -127,7 +125,7 @@ static int luv_work_cb(lua_State* L) {
return LUA_OK;
}

static lua_State* luv_work_acquire_vm(void)
static lua_State* luv_work_acquire_vm(luv_work_vms_t* vms)
{
lua_State* L = uv_key_get(&tls_vmkey);
if (L == NULL)
Expand All @@ -137,17 +135,35 @@ static lua_State* luv_work_acquire_vm(void)
lua_pushboolean(L, 1);
lua_setglobal(L, "_THREAD");

uv_mutex_lock(&vm_mutex);
vms[idx_vms] = L;
idx_vms += 1;
uv_mutex_unlock(&vm_mutex);
uv_mutex_lock(&vms->vm_mutex);
vms->vms[vms->idx_vms] = L;
vms->idx_vms += 1;
uv_mutex_unlock(&vms->vm_mutex);
}
return L;
}

static int luv_work_cleanup(lua_State *L)
{
unsigned int i;
luv_work_vms_t *vms = (luv_work_vms_t*)lua_touserdata(L, 1);

if (!vms || vms->nvms == 0)
return 0;

for (i = 0; i < vms->nvms && vms->vms[i]; i++)
release_vm_cb(vms->vms[i]);

free(vms->vms);

uv_mutex_destroy(&vms->vm_mutex);
vms->nvms = 0;
return 0;
}

static void luv_work_cb_wrapper(uv_work_t* req) {
luv_work_t* work = (luv_work_t*)req->data;
lua_State *L = luv_work_acquire_vm();
lua_State *L = luv_work_acquire_vm(work->ctx->vms);
luv_ctx_t* lctx = luv_context(L);

// If exit is called on a thread in the thread pool, abort is called in
Expand Down Expand Up @@ -197,6 +213,10 @@ static int luv_new_work(lua_State* L) {
ctx = (luv_work_ctx_t*)lua_newuserdata(L, sizeof(*ctx));
memset(ctx, 0, sizeof(*ctx));

lua_rawgetp(L, LUA_REGISTRYINDEX, &luv_work_cleanup);
ctx->vms = (luv_work_vms_t*)lua_touserdata(L, -1);
lua_pop(L, 1);

ctx->len = len;
ctx->code = code;

Expand Down Expand Up @@ -247,7 +267,6 @@ static const luaL_Reg luv_work_ctx_methods[] = {

static void luv_key_init_once(void)
{
const char* val;
int status = uv_key_create(&tls_vmkey);
if (status != 0)
{
Expand All @@ -256,17 +275,26 @@ static void luv_key_init_once(void)
uv_err_name(status), uv_strerror(status));
abort();
}
status = uv_mutex_init(&vm_mutex);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_mutex_init with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
}

static void luv_work_init(lua_State* L) {
luaL_newmetatable(L, "luv_work_ctx");
lua_pushcfunction(L, luv_work_ctx_tostring);
lua_setfield(L, -2, "__tostring");
lua_pushcfunction(L, luv_work_ctx_gc);
lua_setfield(L, -2, "__gc");
luaL_newlib(L, luv_work_ctx_methods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);

luaL_newmetatable(L, "luv_work_vms");
lua_pushcfunction(L, luv_work_cleanup);
lua_setfield(L, -2, "__gc");
lua_pop(L, 1);

/* ref to https://github.com/libuv/libuv/blob/v1.x/src/threadpool.c init_threads */
nvms = ARRAY_SIZE(default_vms);
const char* val;
unsigned int nvms = 4;
val = getenv("UV_THREADPOOL_SIZE");
if (val != NULL)
nvms = atoi(val);
Expand All @@ -275,44 +303,25 @@ static void luv_key_init_once(void)
if (nvms > MAX_THREADPOOL_SIZE)
nvms = MAX_THREADPOOL_SIZE;

vms = default_vms;
if (nvms > ARRAY_SIZE(default_vms)) {
vms = malloc(nvms * sizeof(vms[0]));
if (vms == NULL) {
nvms = ARRAY_SIZE(default_vms);
vms = default_vms;
}
memset(vms, 0, sizeof(vms[0]) * nvms);
luv_work_vms_t* vms = (luv_work_vms_t*)lua_newuserdata(L, sizeof(luv_work_vms_t));
int status = uv_mutex_init(&vms->vm_mutex);
if (status != 0)
{
fprintf(stderr, "*** threadpool not works\n");
fprintf(stderr, "Error to uv_mutex_init with %s: %s\n",
uv_err_name(status), uv_strerror(status));
abort();
}
idx_vms = 0;
}

static void luv_work_cleanup(void)
{
unsigned int i;
vms->vms = (lua_State**)calloc(nvms, sizeof(lua_State*));
vms->nvms = nvms;
vms->idx_vms = 0;

if (nvms == 0)
return;

for (i = 0; i < nvms && vms[i]; i++)
release_vm_cb(vms[i]);

if (vms != default_vms)
free(vms);

uv_mutex_destroy(&vm_mutex);
nvms = 0;
}
luaL_getmetatable(L, "luv_work_vms");
lua_setmetatable(L, -2);

static void luv_work_init(lua_State* L) {
luaL_newmetatable(L, "luv_work_ctx");
lua_pushcfunction(L, luv_work_ctx_tostring);
lua_setfield(L, -2, "__tostring");
lua_pushcfunction(L, luv_work_ctx_gc);
lua_setfield(L, -2, "__gc");
luaL_newlib(L, luv_work_ctx_methods);
lua_setfield(L, -2, "__index");
lua_pop(L, 1);
// store the luv_work_vms_t in registry
lua_rawsetp(L, LUA_REGISTRYINDEX, &luv_work_cleanup);

uv_once(&once_vmkey, luv_key_init_once);
}

0 comments on commit 657a08f

Please sign in to comment.