Skip to content

Commit

Permalink
add expert parameters:
Browse files Browse the repository at this point in the history
 - t2_boost_enable_call_find_threads yes/NO - enable t2 thread splitting for searching and creating call instances
 - t2_boost_max_next_call_threads - maximum number of additional t2 threads for searching, creating and processing calls (default 3)
 - storing_cdr_max_next_threads - maximum number of additional threads for prepare storing of cdr records (default 3)
  • Loading branch information
rbucek committed Jan 4, 2021
1 parent 628fea1 commit 6372bbf
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 215 deletions.
221 changes: 141 additions & 80 deletions calltable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,9 @@ extern int opt_rtp_check_both_sides_by_sdp;
extern int opt_hash_modify_queue_length_ms;
extern int opt_mysql_enable_multiple_rows_insert;
extern int opt_mysql_max_multiple_rows_insert;
extern PreProcessPacket *preProcessPacketCallX[];
extern PreProcessPacket **preProcessPacketCallX;
extern int preProcessPacketCallX_count;
extern volatile PreProcessPacket::eCallX_state preProcessPacketCallX_state;
extern bool opt_disable_sdp_multiplication_warning;
extern bool opt_save_energylevels;

Expand Down Expand Up @@ -213,6 +215,8 @@ extern unsigned int glob_ssl_calls;
extern bool opt_cdr_partition;
extern bool opt_cdr_partition_by_hours;
extern int opt_t2_boost;
extern int opt_t2_boost_call_find_threads;
extern int opt_t2_boost_call_threads;
extern bool opt_time_precision_in_ms;

extern cBilling *billing;
Expand Down Expand Up @@ -6954,30 +6958,32 @@ Call::saveToDb(bool enableBatchIfPossible) {
counterSqlStore % opt_mysqlstore_max_threads_cdr :
0;
++counterSqlStore;
if(useCsvStoreFormat()) {
if(existsChartsCacheServer()) {
SqlDb_row::SqlDb_rowField *f_store_flags = cdr.add(_sf_db, "store_flags");
string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2);
f_store_flags->content = intToString(_sf_charts_cache);
f_store_flags->ifv.v._int = _sf_charts_cache;
query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(),
STORE_PROC_ID_CHARTS_CACHE,
opt_mysqlstore_max_threads_charts_cache > 1 &&
sqlStore->getSize(STORE_PROC_ID_CHARTS_CACHE, 0) > 1000 ?
counterSqlStore % opt_mysqlstore_max_threads_charts_cache :
0);
if(!sverb.suppress_cdr_insert) {
if(useCsvStoreFormat()) {
if(existsChartsCacheServer()) {
SqlDb_row::SqlDb_rowField *f_store_flags = cdr.add(_sf_db, "store_flags");
string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2);
f_store_flags->content = intToString(_sf_charts_cache);
f_store_flags->ifv.v._int = _sf_charts_cache;
query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(),
STORE_PROC_ID_CHARTS_CACHE,
opt_mysqlstore_max_threads_charts_cache > 1 &&
sqlStore->getSize(STORE_PROC_ID_CHARTS_CACHE, 0) > 1000 ?
counterSqlStore % opt_mysqlstore_max_threads_charts_cache :
0);
} else {
cdr.add(_sf_db | (useChartsCacheInStore() ? _sf_charts_cache : 0), "store_flags");
string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2);
}
} else {
cdr.add(_sf_db | (useChartsCacheInStore() ? _sf_charts_cache : 0), "store_flags");
string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END +
MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END;
sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2);
sqlStore->query_lock(query_str.c_str(), STORE_PROC_ID_CDR, storeId2);
}
} else {
sqlStore->query_lock(query_str.c_str(), STORE_PROC_ID_CDR, storeId2);
}

//cout << endl << endl << query_str << endl << endl << endl;
Expand Down Expand Up @@ -8755,9 +8761,6 @@ Calltable::Calltable(SqlDb *sqlDb) {
#endif
_sync_lock_calls_hash = 0;
_sync_lock_calls_listMAP = 0;
for(int i = 0; i < preProcessPacketCallX_count; i++) {
_sync_lock_calls_listMAP_X[i] = 0;
}
_sync_lock_calls_mergeMAP = 0;
_sync_lock_registers_listMAP = 0;
_sync_lock_calls_queue = 0;
Expand All @@ -8772,6 +8775,17 @@ Calltable::Calltable(SqlDb *sqlDb) {
_sync_lock_process_ss7_listmap = 0;
_sync_lock_process_ss7_queue = 0;

if(preProcessPacketCallX_count > 0) {
calls_listMAP_X = new FILE_LINE(0) map<string, Call*>[preProcessPacketCallX_count];
_sync_lock_calls_listMAP_X = new FILE_LINE(0) volatile int[preProcessPacketCallX_count];
for(int i = 0; i < preProcessPacketCallX_count; i++) {
_sync_lock_calls_listMAP_X[i] = 0;
}
} else {
calls_listMAP_X = NULL;
_sync_lock_calls_listMAP_X = NULL;
}

extern int opt_audioqueue_threads_max;
audioQueueThreadsMax = min(max(2l, sysconf( _SC_NPROCESSORS_ONLN ) - 1), (long)opt_audioqueue_threads_max);
audioQueueTerminating = 0;
Expand Down Expand Up @@ -8818,6 +8832,13 @@ Calltable::~Calltable() {
pthread_mutex_destroy(&registers_listMAPlock);
*/

if(calls_listMAP_X) {
delete [] calls_listMAP_X;
}
if(_sync_lock_calls_listMAP_X) {
delete [] _sync_lock_calls_listMAP_X;
}

if(asyncSystemCommand) {
delete asyncSystemCommand;
}
Expand Down Expand Up @@ -10333,32 +10354,34 @@ Calltable::getCallTableJson(char *params, bool *zip) {
}
unsigned int now = time(NULL);

unsigned activeCallsMax = getApproxCountCalls();
unsigned activeCallsMax = getCountCalls();
activeCallsMax += activeCallsMax / 4;
Call **activeCalls = new FILE_LINE(0) Call*[activeCallsMax];
unsigned activeCallsCount = 0;

for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) {
map<string, Call*> *_calls_listMAP;
if(passListMap == -1) {
lock_calls_listMAP();
_calls_listMAP = &calls_listMAP;
} else {
lock_calls_listMAP_X(passListMap);
_calls_listMAP = &calls_listMAP_X[passListMap];
}
list<Call*>::iterator callIT1;
map<string, Call*>::iterator callMAPIT1;
map<sStreamIds2, Call*>::iterator callMAPIT2;
for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) {
map<string, Call*> *_calls_listMAP;
list<Call*>::iterator callIT1;
map<string, Call*>::iterator callMAPIT1;
map<sStreamIds2, Call*>::iterator callMAPIT2;
if(typeCall == INVITE) {
if(opt_call_id_alternative[0]) {
lock_calls_listMAP();
callIT1 = calltable->calls_list.begin();
} else {
if(passListMap == -1) {
lock_calls_listMAP();
_calls_listMAP = &calls_listMAP;
} else {
lock_calls_listMAP_X(passListMap);
_calls_listMAP = &calls_listMAP_X[passListMap];
}
callMAPIT1 = _calls_listMAP->begin();
}
} else {
lock_calls_listMAP();
callMAPIT2 = calltable->calls_by_stream_callid_listMAP.begin();
}
while(typeCall == INVITE ?
Expand Down Expand Up @@ -10396,11 +10419,19 @@ Calltable::getCallTableJson(char *params, bool *zip) {
++callMAPIT2;
}
}
}
if(passListMap == -1) {
unlock_calls_listMAP();
} else {
unlock_calls_listMAP_X(passListMap);
if(typeCall == INVITE) {
if(opt_call_id_alternative[0]) {
unlock_calls_listMAP();
} else {
if(passListMap == -1) {
unlock_calls_listMAP();
} else {
unlock_calls_listMAP_X(passListMap);
}
}
} else {
unlock_calls_listMAP();
}
}
}
unsigned custom_headers_size = 0;
Expand Down Expand Up @@ -10682,7 +10713,7 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char
syslog(LOG_NOTICE, "call Calltable::cleanup_calls");
}

unsigned closeCallsMax = getApproxCountCalls();
unsigned closeCallsMax = getCountCalls();
if(!closeCallsMax) {
return 0;
}
Expand All @@ -10691,28 +10722,30 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char
unsigned closeCallsCount = 0;

int rejectedCalls_count = 0;
for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) {
map<string, Call*> *_calls_listMAP;
if(passListMap == -1) {
lock_calls_listMAP();
_calls_listMAP = &calls_listMAP;
} else {
lock_calls_listMAP_X(passListMap);
_calls_listMAP = &calls_listMAP_X[passListMap];
}
list<Call*>::iterator callIT1;
map<string, Call*>::iterator callMAPIT1;
map<sStreamIds2, Call*>::iterator callMAPIT2;
for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) {
map<string, Call*> *_calls_listMAP;
list<Call*>::iterator callIT1;
map<string, Call*>::iterator callMAPIT1;
map<sStreamIds2, Call*>::iterator callMAPIT2;
if(typeCall == INVITE) {
if(opt_call_id_alternative[0]) {
callIT1 = calls_list.begin();
lock_calls_listMAP();
callIT1 = calltable->calls_list.begin();
} else {
if(passListMap == -1) {
lock_calls_listMAP();
_calls_listMAP = &calls_listMAP;
} else {
lock_calls_listMAP_X(passListMap);
_calls_listMAP = &calls_listMAP_X[passListMap];
}
callMAPIT1 = _calls_listMAP->begin();
}
} else {
callMAPIT2 = calls_by_stream_callid_listMAP.begin();
lock_calls_listMAP();
callMAPIT2 = calltable->calls_by_stream_callid_listMAP.begin();
}
while(typeCall == INVITE ?
(opt_call_id_alternative[0] ?
Expand Down Expand Up @@ -10815,11 +10848,19 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char
}
}
}
}
if(passListMap == -1) {
unlock_calls_listMAP();
} else {
unlock_calls_listMAP_X(passListMap);
if(typeCall == INVITE) {
if(opt_call_id_alternative[0]) {
unlock_calls_listMAP();
} else {
if(passListMap == -1) {
unlock_calls_listMAP();
} else {
unlock_calls_listMAP_X(passListMap);
}
}
} else {
unlock_calls_listMAP();
}
}
}
for(unsigned i = 0; i < closeCallsCount; i++) {
Expand All @@ -10829,7 +10870,7 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char
}
// Close RTP dump file ASAP to save file handles
if((!currtime && is_terminating()) ||
(opt_t2_boost && preProcessPacketCallX[0]->isActiveOutThread())) {
(useCallX() && preProcessPacketCallX && preProcessPacketCallX[0]->isActiveOutThread())) {
call->getPcap()->close();
call->getPcapSip()->close();
}
Expand Down Expand Up @@ -11027,21 +11068,21 @@ void Calltable::addSystemCommand(const char *command) {
}
}

unsigned Calltable::getApproxCountCalls() {
unsigned count = 0;
for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) {
map<string, Call*> *_calls_listMAP;
if(passListMap == -1) {
_calls_listMAP = &calls_listMAP;
} else {
_calls_listMAP = &calls_listMAP_X[passListMap];
}
for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
size_t Calltable::getCountCalls() {
size_t count = 0;
for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) {
int typeCall = passTypeCall == 0 ? INVITE : MGCP;
for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) {
if(typeCall == INVITE) {
if(opt_call_id_alternative[0]) {
count += calls_list.size();
} else {
map<string, Call*> *_calls_listMAP;
if(passListMap == -1) {
_calls_listMAP = &calls_listMAP;
} else {
_calls_listMAP = &calls_listMAP_X[passListMap];
}
count += _calls_listMAP->size();
}
} else {
Expand All @@ -11052,6 +11093,26 @@ unsigned Calltable::getApproxCountCalls() {
return(count);
}

bool Calltable::enableCallX() {
return(opt_t2_boost && opt_t2_boost_call_threads > 0);
}

bool Calltable::useCallX() {
return(enableCallX() &&
(preProcessPacketCallX_state == PreProcessPacket::callx_process ||
preProcessPacketCallX_state == PreProcessPacket::callx_find));
}

bool Calltable::enableCallFindX() {
return(opt_t2_boost && opt_t2_boost_call_threads > 0 && opt_t2_boost_call_find_threads &&
!opt_call_id_alternative[0]);
}

bool Calltable::useCallFindX() {
return(enableCallFindX() &&
preProcessPacketCallX_state == PreProcessPacket::callx_find);
}


void Call::saveregister(struct timeval *currtime) {
((Calltable*)calltable)->lock_registers_listMAP();
Expand Down
26 changes: 7 additions & 19 deletions calltable.h
Original file line number Diff line number Diff line change
Expand Up @@ -2002,7 +2002,7 @@ class Calltable {
queue<string> files_sqlqueue; //!< this queue is used for asynchronous storing CDR by the worker thread
list<Call*> calls_list;
map<string, Call*> calls_listMAP;
map<string, Call*> calls_listMAP_X[preProcessPacketCallX_count];
map<string, Call*> *calls_listMAP_X;
map<sStreamIds2, Call*> calls_by_stream_callid_listMAP;
map<sStreamId2, Call*> calls_by_stream_id2_listMAP;
map<sStreamId, Call*> calls_by_stream_listMAP;
Expand Down Expand Up @@ -2090,21 +2090,11 @@ class Calltable {
Call *add_mgcp(sMgcpRequest *request, time_t time, vmIP saddr, vmPort sport, vmIP daddr, vmPort dport,
pcap_t *handle, int dlt, int sensorId);

size_t calls_list_count() {
extern char opt_call_id_alternative[256];
extern int opt_t2_boost;
if(opt_call_id_alternative[0]) {
return(calls_list.size());
} else if(opt_t2_boost == 2) {
size_t count = 0;
for(int i = 0; i < preProcessPacketCallX_count; i++) {
count += calls_listMAP_X[i].size();
}
return(count);
} else {
return(calls_listMAP.size());
}
}
size_t getCountCalls();
bool enableCallX();
bool useCallX();
bool enableCallFindX();
bool useCallFindX();

/**
* @brief find Call by call_id
Expand Down Expand Up @@ -2510,8 +2500,6 @@ class Calltable {

void addSystemCommand(const char *command);

unsigned getApproxCountCalls();

private:
/*
pthread_mutex_t qlock; //!< mutex locking calls_queue
Expand All @@ -2537,7 +2525,7 @@ class Calltable {
#endif
volatile int _sync_lock_calls_hash;
volatile int _sync_lock_calls_listMAP;
volatile int _sync_lock_calls_listMAP_X[preProcessPacketCallX_count];
volatile int *_sync_lock_calls_listMAP_X;
volatile int _sync_lock_calls_mergeMAP;
volatile int _sync_lock_registers_listMAP;
volatile int _sync_lock_calls_queue;
Expand Down
Loading

0 comments on commit 6372bbf

Please sign in to comment.