diff --git a/modules/rtpengine/rtpengine.c b/modules/rtpengine/rtpengine.c index bb395fcd07c..39ec8048a65 100644 --- a/modules/rtpengine/rtpengine.c +++ b/modules/rtpengine/rtpengine.c @@ -82,6 +82,9 @@ #include "rtpengine.h" #include "rtpengine_funcs.h" #include "bencode.h" +#include "../../reactor_defs.h" +#include "../../reactor_proc.h" +#include "../../io_wait.h" #if !defined(AF_LOCAL) #define AF_LOCAL AF_UNIX @@ -376,6 +379,7 @@ struct rtpe_set_head **rtpe_set_list =0; struct rtpe_set **default_rtpe_set=0; static str rtpengine_notify_sock; +static short rtpengine_notify_port; static str rtpengine_notify_event_name = str_init("E_RTPENGINE_NOTIFICATION"); static event_id_t rtpengine_notify_event = EVI_ERROR; @@ -738,7 +742,7 @@ static const dep_export_t deps = { static const proc_export_t procs[] = { {"RTPEngine notification receiver", 0, 0, rtpengine_notify_process, 1, - PROC_FLAG_INITCHILD}, + PROC_FLAG_INITCHILD|PROC_FLAG_HAS_IPC|PROC_FLAG_NEEDS_SCRIPT}, {0,0,0,0,0,0} }; @@ -4118,6 +4122,43 @@ static void rtpengine_raise_event(int sender, void *p) #define RTPENGINE_DGRAM_BUF 35536 +static int rtpengine_io_callback(int fd, void *fs, int was_timeout) +{ + int ret; + char *p; + char buffer[RTPENGINE_DGRAM_BUF]; + + do + ret = read(fd, buffer, RTPENGINE_DGRAM_BUF); + while (ret == -1 && errno == EINTR); + if (ret < 0) { + LM_ERR("problem reading on socket %s:%u (%s:%d)\n", + rtpengine_notify_sock.s, rtpengine_notify_port, strerror(errno), errno); + return -1; + } + + if (!evi_probe_event(rtpengine_notify_event)) { + LM_DBG("nothing to do - nobody is listening!\n"); + return 0; + } + + p = shm_malloc(ret + 1); + if (!p) { + /* coverity[string_null] - false positive CID #211356 */ + LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer); + return -1; + } + memcpy(p, buffer, ret); + p[ret] = '\0'; + + LM_INFO("dispatching buffer: %s\n", p); + if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) { + LM_ERR("could not dispatch notification job!\n"); + shm_free(p); + } + return 0; +} + static void rtpengine_notify_process(int rank) { int ret; @@ -4126,7 +4167,6 @@ static void rtpengine_notify_process(int rank) unsigned int port; static int rtpengine_notify_fd; union sockaddr_union ss; - char buffer[RTPENGINE_DGRAM_BUF]; p = q_memchr(rtpengine_notify_sock.s, ':', rtpengine_notify_sock.len); if (!p) { @@ -4173,37 +4213,17 @@ static void rtpengine_notify_process(int rank) goto end; } - for (;;) { - do - ret = read(rtpengine_notify_fd, buffer, RTPENGINE_DGRAM_BUF); - while (ret == -1 && errno == EINTR); - if (ret < 0) { - LM_ERR("problem reading on socket %s:%u (%s:%d)\n", - rtpengine_notify_sock.s, port, strerror(errno), errno); - goto end; - } - - if (!evi_probe_event(rtpengine_notify_event)) { - LM_DBG("nothing to do - nobody is listening!\n"); - continue; - } - - p = shm_malloc(ret + 1); - if (!p) { - /* coverity[string_null] - false positive CID #211356 */ - LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer); - continue; - } - memcpy(p, buffer, ret); - p[ret] = '\0'; + if (reactor_proc_init("RTPengine events") < 0) { + LM_ERR("failed to init the RTPengine events\n"); + goto end; + } - LM_INFO("dispatching buffer: %s\n", p); - if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) { - LM_ERR("could not dispatch notification job!\n"); - shm_free(p); - } + if (reactor_proc_add_fd(rtpengine_notify_fd, rtpengine_io_callback, NULL) < 0) { + LM_CRIT("failed to add RTPengine listen socket to reactor\n"); + goto end; } + reactor_proc_loop(); end: close(rtpengine_notify_fd); }