From 08a80f5af1f525cab8bd98c08226c4e729e4db5e Mon Sep 17 00:00:00 2001 From: Razvan Crainea Date: Tue, 20 Aug 2024 18:48:32 +0300 Subject: [PATCH] rtpengine: handle notifications through IPC This way we also have script routes in the notification process, preventing it from crashing when events are raised. Thanks go to Norm Brandinger for reporting it! --- modules/rtpengine/rtpengine.c | 80 ++++++++++++++++++++++------------- 1 file changed, 50 insertions(+), 30 deletions(-) diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index bb395fcd07c..39ec8048a65 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -82,6 +82,9 @@ #include "rtpengine.h" #include "rtpengine_funcs.h" #include "bencode.h" +#include "../../reactor_defs.h" +#include "../../reactor_proc.h" +#include "../../io_wait.h" #if !defined(AF_LOCAL) #define AF_LOCAL AF_UNIX @@ -376,6 +379,7 @@ struct rtpe_set_head **rtpe_set_list =0; struct rtpe_set **default_rtpe_set=0; static str rtpengine_notify_sock; +static short rtpengine_notify_port; static str rtpengine_notify_event_name = str_init("E_RTPENGINE_NOTIFICATION"); static event_id_t rtpengine_notify_event = EVI_ERROR; @@ -738,7 +742,7 @@ static const dep_export_t deps = { static const proc_export_t procs[] = { {"RTPEngine notification receiver", 0, 0, rtpengine_notify_process, 1, - PROC_FLAG_INITCHILD}, + PROC_FLAG_INITCHILD|PROC_FLAG_HAS_IPC|PROC_FLAG_NEEDS_SCRIPT}, {0,0,0,0,0,0} }; @@ -4118,6 +4122,43 @@ static void rtpengine_raise_event(int sender, void *p) #define RTPENGINE_DGRAM_BUF 35536 +static int rtpengine_io_callback(int fd, void *fs, int was_timeout) +{ + int ret; + char *p; + char buffer[RTPENGINE_DGRAM_BUF]; + + do + ret = read(fd, buffer, RTPENGINE_DGRAM_BUF); + while (ret == -1 && errno == EINTR); + if (ret < 0) { + LM_ERR("problem reading on socket %s:%u (%s:%d)\n", + rtpengine_notify_sock.s, rtpengine_notify_port, strerror(errno), errno); + return -1; + } + + if (!evi_probe_event(rtpengine_notify_event)) { + LM_DBG("nothing to do - nobody is listening!\n"); + return 0; + } + + p = shm_malloc(ret + 1); + if (!p) { + /* coverity[string_null] - false positive CID #211356 */ + LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer); + return -1; + } + memcpy(p, buffer, ret); + p[ret] = '\0'; + + LM_INFO("dispatching buffer: %s\n", p); + if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) { + LM_ERR("could not dispatch notification job!\n"); + shm_free(p); + } + return 0; +} + static void rtpengine_notify_process(int rank) { int ret; @@ -4126,7 +4167,6 @@ static void rtpengine_notify_process(int rank) unsigned int port; static int rtpengine_notify_fd; union sockaddr_union ss; - char buffer[RTPENGINE_DGRAM_BUF]; p = q_memchr(rtpengine_notify_sock.s, ':', rtpengine_notify_sock.len); if (!p) { @@ -4173,37 +4213,17 @@ static void rtpengine_notify_process(int rank) goto end; } - for (;;) { - do - ret = read(rtpengine_notify_fd, buffer, RTPENGINE_DGRAM_BUF); - while (ret == -1 && errno == EINTR); - if (ret < 0) { - LM_ERR("problem reading on socket %s:%u (%s:%d)\n", - rtpengine_notify_sock.s, port, strerror(errno), errno); - goto end; - } - - if (!evi_probe_event(rtpengine_notify_event)) { - LM_DBG("nothing to do - nobody is listening!\n"); - continue; - } - - p = shm_malloc(ret + 1); - if (!p) { - /* coverity[string_null] - false positive CID #211356 */ - LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer); - continue; - } - memcpy(p, buffer, ret); - p[ret] = '\0'; + if (reactor_proc_init("RTPengine events") < 0) { + LM_ERR("failed to init the RTPengine events\n"); + goto end; + } - LM_INFO("dispatching buffer: %s\n", p); - if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) { - LM_ERR("could not dispatch notification job!\n"); - shm_free(p); - } + if (reactor_proc_add_fd(rtpengine_notify_fd, rtpengine_io_callback, NULL) < 0) { + LM_CRIT("failed to add RTPengine listen socket to reactor\n"); + goto end; } + reactor_proc_loop(); end: close(rtpengine_notify_fd); }