Skip to content

Commit

Permalink
rtpengine: handle notifications through IPC
Browse files Browse the repository at this point in the history
This way we also have script routes in the notification process,
preventing it from crashing when events are raised.

Thanks go to Norm Brandinger for reporting it!
  • Loading branch information
razvancrainea committed Aug 20, 2024
1 parent a2c0054 commit 08a80f5
Showing 1 changed file with 50 additions and 30 deletions.
80 changes: 50 additions & 30 deletions modules/rtpengine/rtpengine.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@
#include "rtpengine.h"
#include "rtpengine_funcs.h"
#include "bencode.h"
#include "../../reactor_defs.h"
#include "../../reactor_proc.h"
#include "../../io_wait.h"

#if !defined(AF_LOCAL)
#define AF_LOCAL AF_UNIX
Expand Down Expand Up @@ -376,6 +379,7 @@ struct rtpe_set_head **rtpe_set_list =0;
struct rtpe_set **default_rtpe_set=0;

static str rtpengine_notify_sock;
static short rtpengine_notify_port;
static str rtpengine_notify_event_name = str_init("E_RTPENGINE_NOTIFICATION");
static event_id_t rtpengine_notify_event = EVI_ERROR;

Expand Down Expand Up @@ -738,7 +742,7 @@ static const dep_export_t deps = {

static const proc_export_t procs[] = {
{"RTPEngine notification receiver", 0, 0, rtpengine_notify_process, 1,
PROC_FLAG_INITCHILD},
PROC_FLAG_INITCHILD|PROC_FLAG_HAS_IPC|PROC_FLAG_NEEDS_SCRIPT},
{0,0,0,0,0,0}
};

Expand Down Expand Up @@ -4118,6 +4122,43 @@ static void rtpengine_raise_event(int sender, void *p)

#define RTPENGINE_DGRAM_BUF 35536

static int rtpengine_io_callback(int fd, void *fs, int was_timeout)
{
int ret;
char *p;
char buffer[RTPENGINE_DGRAM_BUF];

do
ret = read(fd, buffer, RTPENGINE_DGRAM_BUF);
while (ret == -1 && errno == EINTR);
if (ret < 0) {
LM_ERR("problem reading on socket %s:%u (%s:%d)\n",
rtpengine_notify_sock.s, rtpengine_notify_port, strerror(errno), errno);
return -1;
}

if (!evi_probe_event(rtpengine_notify_event)) {
LM_DBG("nothing to do - nobody is listening!\n");
return 0;
}

p = shm_malloc(ret + 1);
if (!p) {
/* coverity[string_null] - false positive CID #211356 */
LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer);
return -1;
}
memcpy(p, buffer, ret);
p[ret] = '\0';

LM_INFO("dispatching buffer: %s\n", p);
if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) {
LM_ERR("could not dispatch notification job!\n");
shm_free(p);
}
return 0;
}

static void rtpengine_notify_process(int rank)
{
int ret;
Expand All @@ -4126,7 +4167,6 @@ static void rtpengine_notify_process(int rank)
unsigned int port;
static int rtpengine_notify_fd;
union sockaddr_union ss;
char buffer[RTPENGINE_DGRAM_BUF];

p = q_memchr(rtpengine_notify_sock.s, ':', rtpengine_notify_sock.len);
if (!p) {
Expand Down Expand Up @@ -4173,37 +4213,17 @@ static void rtpengine_notify_process(int rank)
goto end;
}

for (;;) {
do
ret = read(rtpengine_notify_fd, buffer, RTPENGINE_DGRAM_BUF);
while (ret == -1 && errno == EINTR);
if (ret < 0) {
LM_ERR("problem reading on socket %s:%u (%s:%d)\n",
rtpengine_notify_sock.s, port, strerror(errno), errno);
goto end;
}

if (!evi_probe_event(rtpengine_notify_event)) {
LM_DBG("nothing to do - nobody is listening!\n");
continue;
}

p = shm_malloc(ret + 1);
if (!p) {
/* coverity[string_null] - false positive CID #211356 */
LM_ERR("could not allocate %d for buffer %.*s\n", ret, ret, buffer);
continue;
}
memcpy(p, buffer, ret);
p[ret] = '\0';
if (reactor_proc_init("RTPengine events") < 0) {
LM_ERR("failed to init the RTPengine events\n");
goto end;
}

LM_INFO("dispatching buffer: %s\n", p);
if (ipc_dispatch_rpc(rtpengine_raise_event, p) < 0) {
LM_ERR("could not dispatch notification job!\n");
shm_free(p);
}
if (reactor_proc_add_fd(rtpengine_notify_fd, rtpengine_io_callback, NULL) < 0) {
LM_CRIT("failed to add RTPengine listen socket to reactor\n");
goto end;
}

reactor_proc_loop();
end:
close(rtpengine_notify_fd);
}
Expand Down

0 comments on commit 08a80f5

Please sign in to comment.