diff --git a/calltable.cpp b/calltable.cpp index aa418f6bd..e611b38af 100644 --- a/calltable.cpp +++ b/calltable.cpp @@ -169,7 +169,9 @@ extern int opt_rtp_check_both_sides_by_sdp; extern int opt_hash_modify_queue_length_ms; extern int opt_mysql_enable_multiple_rows_insert; extern int opt_mysql_max_multiple_rows_insert; -extern PreProcessPacket *preProcessPacketCallX[]; +extern PreProcessPacket **preProcessPacketCallX; +extern int preProcessPacketCallX_count; +extern volatile PreProcessPacket::eCallX_state preProcessPacketCallX_state; extern bool opt_disable_sdp_multiplication_warning; extern bool opt_save_energylevels; @@ -213,6 +215,8 @@ extern unsigned int glob_ssl_calls; extern bool opt_cdr_partition; extern bool opt_cdr_partition_by_hours; extern int opt_t2_boost; +extern int opt_t2_boost_call_find_threads; +extern int opt_t2_boost_call_threads; extern bool opt_time_precision_in_ms; extern cBilling *billing; @@ -6954,30 +6958,32 @@ Call::saveToDb(bool enableBatchIfPossible) { counterSqlStore % opt_mysqlstore_max_threads_cdr : 0; ++counterSqlStore; - if(useCsvStoreFormat()) { - if(existsChartsCacheServer()) { - SqlDb_row::SqlDb_rowField *f_store_flags = cdr.add(_sf_db, "store_flags"); - string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + - MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; - sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2); - f_store_flags->content = intToString(_sf_charts_cache); - f_store_flags->ifv.v._int = _sf_charts_cache; - query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + - MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; - sqlStore->query_lock((query_str_cdr + query_str).c_str(), - STORE_PROC_ID_CHARTS_CACHE, - opt_mysqlstore_max_threads_charts_cache > 1 && - sqlStore->getSize(STORE_PROC_ID_CHARTS_CACHE, 0) > 1000 ? - counterSqlStore % opt_mysqlstore_max_threads_charts_cache : - 0); + if(!sverb.suppress_cdr_insert) { + if(useCsvStoreFormat()) { + if(existsChartsCacheServer()) { + SqlDb_row::SqlDb_rowField *f_store_flags = cdr.add(_sf_db, "store_flags"); + string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + + MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; + sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2); + f_store_flags->content = intToString(_sf_charts_cache); + f_store_flags->ifv.v._int = _sf_charts_cache; + query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + + MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; + sqlStore->query_lock((query_str_cdr + query_str).c_str(), + STORE_PROC_ID_CHARTS_CACHE, + opt_mysqlstore_max_threads_charts_cache > 1 && + sqlStore->getSize(STORE_PROC_ID_CHARTS_CACHE, 0) > 1000 ? + counterSqlStore % opt_mysqlstore_max_threads_charts_cache : + 0); + } else { + cdr.add(_sf_db | (useChartsCacheInStore() ? _sf_charts_cache : 0), "store_flags"); + string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + + MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; + sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2); + } } else { - cdr.add(_sf_db | (useChartsCacheInStore() ? _sf_charts_cache : 0), "store_flags"); - string query_str_cdr = MYSQL_MAIN_INSERT_CSV_HEADER("cdr") + cdr.implodeFields(",", "\"") + MYSQL_CSV_END + - MYSQL_MAIN_INSERT_CSV_ROW("cdr") + cdr.implodeContentTypeToCsv(true) + MYSQL_CSV_END; - sqlStore->query_lock((query_str_cdr + query_str).c_str(), STORE_PROC_ID_CDR, storeId2); + sqlStore->query_lock(query_str.c_str(), STORE_PROC_ID_CDR, storeId2); } - } else { - sqlStore->query_lock(query_str.c_str(), STORE_PROC_ID_CDR, storeId2); } //cout << endl << endl << query_str << endl << endl << endl; @@ -8755,9 +8761,6 @@ Calltable::Calltable(SqlDb *sqlDb) { #endif _sync_lock_calls_hash = 0; _sync_lock_calls_listMAP = 0; - for(int i = 0; i < preProcessPacketCallX_count; i++) { - _sync_lock_calls_listMAP_X[i] = 0; - } _sync_lock_calls_mergeMAP = 0; _sync_lock_registers_listMAP = 0; _sync_lock_calls_queue = 0; @@ -8772,6 +8775,17 @@ Calltable::Calltable(SqlDb *sqlDb) { _sync_lock_process_ss7_listmap = 0; _sync_lock_process_ss7_queue = 0; + if(preProcessPacketCallX_count > 0) { + calls_listMAP_X = new FILE_LINE(0) map[preProcessPacketCallX_count]; + _sync_lock_calls_listMAP_X = new FILE_LINE(0) volatile int[preProcessPacketCallX_count]; + for(int i = 0; i < preProcessPacketCallX_count; i++) { + _sync_lock_calls_listMAP_X[i] = 0; + } + } else { + calls_listMAP_X = NULL; + _sync_lock_calls_listMAP_X = NULL; + } + extern int opt_audioqueue_threads_max; audioQueueThreadsMax = min(max(2l, sysconf( _SC_NPROCESSORS_ONLN ) - 1), (long)opt_audioqueue_threads_max); audioQueueTerminating = 0; @@ -8818,6 +8832,13 @@ Calltable::~Calltable() { pthread_mutex_destroy(®isters_listMAPlock); */ + if(calls_listMAP_X) { + delete [] calls_listMAP_X; + } + if(_sync_lock_calls_listMAP_X) { + delete [] _sync_lock_calls_listMAP_X; + } + if(asyncSystemCommand) { delete asyncSystemCommand; } @@ -10333,32 +10354,34 @@ Calltable::getCallTableJson(char *params, bool *zip) { } unsigned int now = time(NULL); - unsigned activeCallsMax = getApproxCountCalls(); + unsigned activeCallsMax = getCountCalls(); activeCallsMax += activeCallsMax / 4; Call **activeCalls = new FILE_LINE(0) Call*[activeCallsMax]; unsigned activeCallsCount = 0; - for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) { - map *_calls_listMAP; - if(passListMap == -1) { - lock_calls_listMAP(); - _calls_listMAP = &calls_listMAP; - } else { - lock_calls_listMAP_X(passListMap); - _calls_listMAP = &calls_listMAP_X[passListMap]; - } - list::iterator callIT1; - map::iterator callMAPIT1; - map::iterator callMAPIT2; - for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) { - int typeCall = passTypeCall == 0 ? INVITE : MGCP; + for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) { + int typeCall = passTypeCall == 0 ? INVITE : MGCP; + for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) { + map *_calls_listMAP; + list::iterator callIT1; + map::iterator callMAPIT1; + map::iterator callMAPIT2; if(typeCall == INVITE) { if(opt_call_id_alternative[0]) { + lock_calls_listMAP(); callIT1 = calltable->calls_list.begin(); } else { + if(passListMap == -1) { + lock_calls_listMAP(); + _calls_listMAP = &calls_listMAP; + } else { + lock_calls_listMAP_X(passListMap); + _calls_listMAP = &calls_listMAP_X[passListMap]; + } callMAPIT1 = _calls_listMAP->begin(); } } else { + lock_calls_listMAP(); callMAPIT2 = calltable->calls_by_stream_callid_listMAP.begin(); } while(typeCall == INVITE ? @@ -10396,11 +10419,19 @@ Calltable::getCallTableJson(char *params, bool *zip) { ++callMAPIT2; } } - } - if(passListMap == -1) { - unlock_calls_listMAP(); - } else { - unlock_calls_listMAP_X(passListMap); + if(typeCall == INVITE) { + if(opt_call_id_alternative[0]) { + unlock_calls_listMAP(); + } else { + if(passListMap == -1) { + unlock_calls_listMAP(); + } else { + unlock_calls_listMAP_X(passListMap); + } + } + } else { + unlock_calls_listMAP(); + } } } unsigned custom_headers_size = 0; @@ -10682,7 +10713,7 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char syslog(LOG_NOTICE, "call Calltable::cleanup_calls"); } - unsigned closeCallsMax = getApproxCountCalls(); + unsigned closeCallsMax = getCountCalls(); if(!closeCallsMax) { return 0; } @@ -10691,28 +10722,30 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char unsigned closeCallsCount = 0; int rejectedCalls_count = 0; - for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) { - map *_calls_listMAP; - if(passListMap == -1) { - lock_calls_listMAP(); - _calls_listMAP = &calls_listMAP; - } else { - lock_calls_listMAP_X(passListMap); - _calls_listMAP = &calls_listMAP_X[passListMap]; - } - list::iterator callIT1; - map::iterator callMAPIT1; - map::iterator callMAPIT2; - for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) { - int typeCall = passTypeCall == 0 ? INVITE : MGCP; + for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) { + int typeCall = passTypeCall == 0 ? INVITE : MGCP; + for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) { + map *_calls_listMAP; + list::iterator callIT1; + map::iterator callMAPIT1; + map::iterator callMAPIT2; if(typeCall == INVITE) { if(opt_call_id_alternative[0]) { - callIT1 = calls_list.begin(); + lock_calls_listMAP(); + callIT1 = calltable->calls_list.begin(); } else { + if(passListMap == -1) { + lock_calls_listMAP(); + _calls_listMAP = &calls_listMAP; + } else { + lock_calls_listMAP_X(passListMap); + _calls_listMAP = &calls_listMAP_X[passListMap]; + } callMAPIT1 = _calls_listMAP->begin(); } } else { - callMAPIT2 = calls_by_stream_callid_listMAP.begin(); + lock_calls_listMAP(); + callMAPIT2 = calltable->calls_by_stream_callid_listMAP.begin(); } while(typeCall == INVITE ? (opt_call_id_alternative[0] ? @@ -10815,11 +10848,19 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char } } } - } - if(passListMap == -1) { - unlock_calls_listMAP(); - } else { - unlock_calls_listMAP_X(passListMap); + if(typeCall == INVITE) { + if(opt_call_id_alternative[0]) { + unlock_calls_listMAP(); + } else { + if(passListMap == -1) { + unlock_calls_listMAP(); + } else { + unlock_calls_listMAP_X(passListMap); + } + } + } else { + unlock_calls_listMAP(); + } } } for(unsigned i = 0; i < closeCallsCount; i++) { @@ -10829,7 +10870,7 @@ Calltable::cleanup_calls( struct timeval *currtime, bool forceClose, const char } // Close RTP dump file ASAP to save file handles if((!currtime && is_terminating()) || - (opt_t2_boost && preProcessPacketCallX[0]->isActiveOutThread())) { + (useCallX() && preProcessPacketCallX && preProcessPacketCallX[0]->isActiveOutThread())) { call->getPcap()->close(); call->getPcapSip()->close(); } @@ -11027,21 +11068,21 @@ void Calltable::addSystemCommand(const char *command) { } } -unsigned Calltable::getApproxCountCalls() { - unsigned count = 0; - for(int passListMap = -1; passListMap < (opt_t2_boost == 2 ? preProcessPacketCallX_count : 0); passListMap++) { - map *_calls_listMAP; - if(passListMap == -1) { - _calls_listMAP = &calls_listMAP; - } else { - _calls_listMAP = &calls_listMAP_X[passListMap]; - } - for(int passTypeCall = 0; passTypeCall < (passListMap == -1 ? 2 : 1); passTypeCall++) { - int typeCall = passTypeCall == 0 ? INVITE : MGCP; +size_t Calltable::getCountCalls() { + size_t count = 0; + for(int passTypeCall = 0; passTypeCall < 2; passTypeCall++) { + int typeCall = passTypeCall == 0 ? INVITE : MGCP; + for(int passListMap = -1; passListMap < (typeCall == INVITE && useCallFindX() ? preProcessPacketCallX_count : 0); passListMap++) { if(typeCall == INVITE) { if(opt_call_id_alternative[0]) { count += calls_list.size(); } else { + map *_calls_listMAP; + if(passListMap == -1) { + _calls_listMAP = &calls_listMAP; + } else { + _calls_listMAP = &calls_listMAP_X[passListMap]; + } count += _calls_listMAP->size(); } } else { @@ -11052,6 +11093,26 @@ unsigned Calltable::getApproxCountCalls() { return(count); } +bool Calltable::enableCallX() { + return(opt_t2_boost && opt_t2_boost_call_threads > 0); +} + +bool Calltable::useCallX() { + return(enableCallX() && + (preProcessPacketCallX_state == PreProcessPacket::callx_process || + preProcessPacketCallX_state == PreProcessPacket::callx_find)); +} + +bool Calltable::enableCallFindX() { + return(opt_t2_boost && opt_t2_boost_call_threads > 0 && opt_t2_boost_call_find_threads && + !opt_call_id_alternative[0]); +} + +bool Calltable::useCallFindX() { + return(enableCallFindX() && + preProcessPacketCallX_state == PreProcessPacket::callx_find); +} + void Call::saveregister(struct timeval *currtime) { ((Calltable*)calltable)->lock_registers_listMAP(); diff --git a/calltable.h b/calltable.h index c8b5067ec..7e392e75f 100644 --- a/calltable.h +++ b/calltable.h @@ -2002,7 +2002,7 @@ class Calltable { queue files_sqlqueue; //!< this queue is used for asynchronous storing CDR by the worker thread list calls_list; map calls_listMAP; - map calls_listMAP_X[preProcessPacketCallX_count]; + map *calls_listMAP_X; map calls_by_stream_callid_listMAP; map calls_by_stream_id2_listMAP; map calls_by_stream_listMAP; @@ -2090,21 +2090,11 @@ class Calltable { Call *add_mgcp(sMgcpRequest *request, time_t time, vmIP saddr, vmPort sport, vmIP daddr, vmPort dport, pcap_t *handle, int dlt, int sensorId); - size_t calls_list_count() { - extern char opt_call_id_alternative[256]; - extern int opt_t2_boost; - if(opt_call_id_alternative[0]) { - return(calls_list.size()); - } else if(opt_t2_boost == 2) { - size_t count = 0; - for(int i = 0; i < preProcessPacketCallX_count; i++) { - count += calls_listMAP_X[i].size(); - } - return(count); - } else { - return(calls_listMAP.size()); - } - } + size_t getCountCalls(); + bool enableCallX(); + bool useCallX(); + bool enableCallFindX(); + bool useCallFindX(); /** * @brief find Call by call_id @@ -2510,8 +2500,6 @@ class Calltable { void addSystemCommand(const char *command); - unsigned getApproxCountCalls(); - private: /* pthread_mutex_t qlock; //!< mutex locking calls_queue @@ -2537,7 +2525,7 @@ class Calltable { #endif volatile int _sync_lock_calls_hash; volatile int _sync_lock_calls_listMAP; - volatile int _sync_lock_calls_listMAP_X[preProcessPacketCallX_count]; + volatile int *_sync_lock_calls_listMAP_X; volatile int _sync_lock_calls_mergeMAP; volatile int _sync_lock_registers_listMAP; volatile int _sync_lock_calls_queue; diff --git a/common.h b/common.h index f675b953a..baa605b34 100644 --- a/common.h +++ b/common.h @@ -119,6 +119,7 @@ struct sVerbose { int charts_cache_filters_eval_rslt_true; char sipcallerip_filter[100]; char sipcalledip_filter[100]; + int suppress_cdr_insert; int suppress_server_store; int suppress_fork; char *trace_call; diff --git a/pcap_queue.cpp b/pcap_queue.cpp index 3fbdb24e8..aa33769dc 100644 --- a/pcap_queue.cpp +++ b/pcap_queue.cpp @@ -121,8 +121,9 @@ extern Calltable *calltable; extern volatile int calls_counter; extern volatile int registers_counter; extern PreProcessPacket *preProcessPacket[PreProcessPacket::ppt_end_base]; -extern PreProcessPacket *preProcessPacketCallX[]; -extern PreProcessPacket *preProcessPacketCallFindX[]; +extern PreProcessPacket **preProcessPacketCallX; +extern PreProcessPacket **preProcessPacketCallFindX; +extern int preProcessPacketCallX_count; extern ProcessRtpPacket *processRtpPacketHash; extern ProcessRtpPacket *processRtpPacketDistribute[MAX_PROCESS_RTP_PACKET_THREADS]; extern TcpReassembly *tcpReassemblyHttp; @@ -1428,7 +1429,7 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { string statString = "\n"; if(statCalls) { ostringstream outStr; - outStr << "CALLS: " << calltable->calls_list_count() << ", " << calls_counter; + outStr << "CALLS: " << calltable->getCountCalls() << ", " << calls_counter; if(opt_ipaccount) { outStr << " IPACC_BUFFER " << lengthIpaccBuffer(); } @@ -1476,7 +1477,7 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { lapTimeDescr.push_back("check heap"); } if(!this->isMirrorSender()) { - outStr << "calls[" << (calltable->calls_list_count() + calltable->calls_by_stream_callid_listMAP.size()) << ",r:" << calltable->registers_listMAP.size() << "]" + outStr << "calls[" << calltable->getCountCalls() << ",r:" << calltable->registers_listMAP.size() << "]" << "[" << calls_counter << ",r:" << registers_counter << "]"; calltable->lock_calls_audioqueue(); size_t audioQueueSize = calltable->audio_queue.size(); @@ -1509,7 +1510,7 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { outStr << "ipacc_buffer[" << lengthIpaccBuffer() << "/" << sizeIpaccBuffer() << "] "; } if (opt_rrd) { - rrd_set_value(RRD_VALUE_inv, calltable->calls_list_count()); + rrd_set_value(RRD_VALUE_inv, calltable->getCountCalls()); rrd_set_value(RRD_VALUE_reg, calltable->registers_listMAP.size()); } if(sverb.log_profiler) { @@ -2240,47 +2241,49 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { } } } - if(preProcessPacketCallX[0]) { - for(int i = 0; i < preProcessPacketCallX_count + 1; i++) { - double percFullQring; - double t2cpu_preprocess_packet_out_thread = preProcessPacketCallX[i]->getCpuUsagePerc(true, &percFullQring); - if(t2cpu_preprocess_packet_out_thread >= 0) { - outStrStat << "/" - << preProcessPacketCallX[i]->getShortcatTypeThread() - << setprecision(1) << t2cpu_preprocess_packet_out_thread; - if(sverb.qring_stat) { - double qringFillingPerc = preProcessPacketCallX[i]->getQringFillingPerc(); - if(qringFillingPerc > 0) { - outStrStat << "r" << qringFillingPerc; + if(opt_t2_boost) { + if(preProcessPacketCallX && calltable->useCallX()) { + for(int i = 0; i < preProcessPacketCallX_count + 1; i++) { + double percFullQring; + double t2cpu_preprocess_packet_out_thread = preProcessPacketCallX[i]->getCpuUsagePerc(true, &percFullQring); + if(t2cpu_preprocess_packet_out_thread >= 0) { + outStrStat << "/" + << preProcessPacketCallX[i]->getShortcatTypeThread() + << setprecision(1) << t2cpu_preprocess_packet_out_thread; + if(sverb.qring_stat) { + double qringFillingPerc = preProcessPacketCallX[i]->getQringFillingPerc(); + if(qringFillingPerc > 0) { + outStrStat << "r" << qringFillingPerc; + } } - } - if(sverb.qring_full && percFullQring > sverb.qring_full) { - outStrStat << "#" << percFullQring; - } - ++count_t2cpu; - sum_t2cpu += t2cpu_preprocess_packet_out_thread; - } - } - } - if(preProcessPacketCallFindX[0]) { - for(int i = 0; i < preProcessPacketCallX_count; i++) { - double percFullQring; - double t2cpu_preprocess_packet_out_thread = preProcessPacketCallFindX[i]->getCpuUsagePerc(true, &percFullQring); - if(t2cpu_preprocess_packet_out_thread >= 0) { - outStrStat << "/" - << preProcessPacketCallFindX[i]->getShortcatTypeThread() - << setprecision(1) << t2cpu_preprocess_packet_out_thread; - if(sverb.qring_stat) { - double qringFillingPerc = preProcessPacketCallFindX[i]->getQringFillingPerc(); - if(qringFillingPerc > 0) { - outStrStat << "r" << qringFillingPerc; + if(sverb.qring_full && percFullQring > sverb.qring_full) { + outStrStat << "#" << percFullQring; } + ++count_t2cpu; + sum_t2cpu += t2cpu_preprocess_packet_out_thread; } - if(sverb.qring_full && percFullQring > sverb.qring_full) { - outStrStat << "#" << percFullQring; + } + } + if(preProcessPacketCallFindX && calltable->useCallFindX()) { + for(int i = 0; i < preProcessPacketCallX_count; i++) { + double percFullQring; + double t2cpu_preprocess_packet_out_thread = preProcessPacketCallFindX[i]->getCpuUsagePerc(true, &percFullQring); + if(t2cpu_preprocess_packet_out_thread >= 0) { + outStrStat << "/" + << preProcessPacketCallFindX[i]->getShortcatTypeThread() + << setprecision(1) << t2cpu_preprocess_packet_out_thread; + if(sverb.qring_stat) { + double qringFillingPerc = preProcessPacketCallFindX[i]->getQringFillingPerc(); + if(qringFillingPerc > 0) { + outStrStat << "r" << qringFillingPerc; + } + } + if(sverb.qring_full && percFullQring > sverb.qring_full) { + outStrStat << "#" << percFullQring; + } + ++count_t2cpu; + sum_t2cpu += t2cpu_preprocess_packet_out_thread; } - ++count_t2cpu; - sum_t2cpu += t2cpu_preprocess_packet_out_thread; } } } @@ -2354,7 +2357,7 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { } if(call_t2cpu_preprocess_packet_out_thread > opt_cpu_limit_new_thread_high && heapPerc > 10 && - opt_t2_boost && !preProcessPacketCallX[0]->isActiveOutThread()) { + calltable->enableCallX() && !calltable->useCallX()) { PreProcessPacket::autoStartCallX_PreProcessPacket(); } if(last_t2cpu_preprocess_packet_out_thread_rtp > opt_cpu_limit_new_thread) { @@ -2491,11 +2494,11 @@ void PcapQueue::pcapStat(int statPeriod, bool statCalls) { } if(storing_cdr_cpu_avg > opt_cpu_limit_new_thread_high && calls_counter > 10000 && - calls_counter > (int)calltable->calls_list_count() * 2) { + calls_counter > (int)calltable->getCountCalls() * 2) { extern void storing_cdr_next_thread_add(); storing_cdr_next_thread_add(); } else if(storing_cdr_cpu_avg < opt_cpu_limit_delete_thread && - calls_counter < calltable->calls_list_count() * 1.5) { + calls_counter < (int)calltable->getCountCalls() * 1.5) { extern void storing_cdr_next_thread_remove(); storing_cdr_next_thread_remove(); } diff --git a/sniff.cpp b/sniff.cpp index 06f64f541..590c59c8c 100644 --- a/sniff.cpp +++ b/sniff.cpp @@ -236,10 +236,12 @@ extern int opt_182queuedpauserecording; extern SocketSimpleBufferWrite *sipSendSocket; extern int opt_sip_send_before_packetbuffer; extern PreProcessPacket *preProcessPacket[PreProcessPacket::ppt_end_base]; -extern PreProcessPacket *preProcessPacketCallX[]; -extern PreProcessPacket *preProcessPacketCallFindX[]; +extern PreProcessPacket **preProcessPacketCallX; +extern PreProcessPacket **preProcessPacketCallFindX; +extern int preProcessPacketCallX_count; extern ProcessRtpPacket *processRtpPacketHash; extern ProcessRtpPacket *processRtpPacketDistribute[MAX_PROCESS_RTP_PACKET_THREADS]; +extern volatile PreProcessPacket::eCallX_state preProcessPacketCallX_state; extern CustomHeaders *custom_headers_cdr; extern CustomHeaders *custom_headers_message; extern CustomHeaders *custom_headers_sip_msg; @@ -5360,10 +5362,10 @@ inline void process_packet__cleanup_calls(timeval *ts_input, const char *file, i if(verbosity > 0 && is_read_from_file_simple()) { if(opt_dup_check) { syslog(LOG_NOTICE, "Active calls [%d] calls in sql queue [%d] skipped dupe pkts [%u]\n", - (int)calltable->calls_list_count(), (int)calltable->calls_queue.size(), duplicate_counter); + (int)calltable->getCountCalls(), (int)calltable->calls_queue.size(), duplicate_counter); } else { syslog(LOG_NOTICE, "Active calls [%d] calls in sql queue [%d]\n", - (int)calltable->calls_list_count(), (int)calltable->calls_queue.size()); + (int)calltable->getCountCalls(), (int)calltable->calls_queue.size()); } } process_packet__last_cleanup_calls = ts.tv_sec; @@ -6841,7 +6843,7 @@ void readdump_libpcap(pcap_t *handle, u_int16_t handle_index) { _process_packet__cleanup_calls(&HPH(header_packet)->ts, __FILE__, __LINE__); ostringstream outStr; outStr << fixed; - outStr << "calls[" << (calltable->calls_list_count() + calltable->calls_by_stream_callid_listMAP.size()) << ",r:" << calltable->registers_listMAP.size() << "]" + outStr << "calls[" << calltable->getCountCalls() << ",r:" << calltable->registers_listMAP.size() << "]" << "[" << calls_counter << ",r:" << registers_counter << "]"; syslog(LOG_NOTICE, "%s", outStr.str().c_str()); lastStatTimeMS = timeMS; @@ -7904,14 +7906,16 @@ void *PreProcessPacket::outThreadFunction() { if(!opt_t2_boost) { preProcessPacket[ppt_pp_rtp]->push_batch(); } - if(opt_t2_boost == 2 && preProcessPacketCallFindX[0]->isActiveOutThread()) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_find && + preProcessPacketCallFindX[0]->isActiveOutThread()) { for(int i = 0; i < preProcessPacketCallX_count; i++) { preProcessPacketCallFindX[i]->push_batch(); } } break; case ppt_pp_call: - if(opt_t2_boost == 1 && preProcessPacketCallX[0]->isActiveOutThread()) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_process && + preProcessPacketCallX[0]->isActiveOutThread()) { for(int i = 0; i < preProcessPacketCallX_count; i++) { preProcessPacketCallX[i]->push_batch(); } @@ -7921,7 +7925,8 @@ void *PreProcessPacket::outThreadFunction() { } break; case ppt_pp_callx: - if(opt_t2_boost && preProcessPacketCallX[0]->isActiveOutThread() && + if(opt_t2_boost && preProcessPacketCallX_state != PreProcessPacket::callx_na && + preProcessPacketCallX[0]->isActiveOutThread() && idPreProcessThread == preProcessPacketCallX_count) { _process_packet__cleanup_calls(__FILE__, __LINE__); } @@ -8012,7 +8017,7 @@ void PreProcessPacket::push_batch_nothread() { preProcessPacket[ppt_pp_rtp]->push_batch(); } } - if(opt_t2_boost == 2) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_find) { for(int i = 0; i < preProcessPacketCallX_count; i++) { if(!preProcessPacketCallFindX[i]->outThreadState) { preProcessPacketCallFindX[i]->push_batch(); @@ -8021,7 +8026,7 @@ void PreProcessPacket::push_batch_nothread() { } break; case ppt_pp_call: - if(opt_t2_boost == 1) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_process) { for(int i = 0; i < preProcessPacketCallX_count; i++) { if(!preProcessPacketCallX[i]->outThreadState) { preProcessPacketCallX[i]->push_batch(); @@ -8234,7 +8239,8 @@ void PreProcessPacket::process_SIP_EXTEND(packet_s_process *packetS) { packetS->blockstore_addflag(101 /*pb lock flag*/); bool pushed = false; if(!packetS->is_register()) { - if(opt_t2_boost == 2 && preProcessPacketCallFindX[0]->isActiveOutThread()) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_find && + preProcessPacketCallFindX[0]->isActiveOutThread()) { preProcessPacketCallFindX[packetS->get_callid_sipextx_index()]->push_packet(packetS); pushed = true; } else { @@ -8281,7 +8287,8 @@ void PreProcessPacket::process_CALL(packet_s_process *packetS) { (packetS->_createCall && packetS->call_created && packetS->call_created->typeIs(BYE)))) { process_packet_sip_alone_bye(packetS); } else { - if(opt_t2_boost == 1 && preProcessPacketCallX[0]->isActiveOutThread()) { + if(opt_t2_boost && preProcessPacketCallX_state == PreProcessPacket::callx_process && + preProcessPacketCallX[0]->isActiveOutThread()) { Call *call = packetS->call ? packetS->call : packetS->call_created; preProcessPacketCallX[call ? call->counter % preProcessPacketCallX_count : 0]->push_packet(packetS); return; @@ -8695,12 +8702,15 @@ void PreProcessPacket::autoStartCallX_PreProcessPacket() { preProcessPacketCallX[i]->startOutThread(); } } - if(opt_t2_boost == 2) { + if(calltable->enableCallFindX()) { for(int i = 0; i < preProcessPacketCallX_count; i++) { if(!preProcessPacketCallFindX[i]->outThreadState) { preProcessPacketCallFindX[i]->startOutThread(); } } + preProcessPacketCallX_state = PreProcessPacket::callx_find; + } else { + preProcessPacketCallX_state = PreProcessPacket::callx_process; } } } diff --git a/sniff.h b/sniff.h index 6cecade37..b41d211c8 100644 --- a/sniff.h +++ b/sniff.h @@ -559,6 +559,7 @@ struct packet_s_process : public packet_s_process_0 { return(callid_long ? callid_long : callid); } inline u_int8_t get_callid_sipextx_index() { + extern int preProcessPacketCallX_count; char *_callid = callid_long ? callid_long : callid; unsigned length = 0; while(length < 6 && _callid[length]) { diff --git a/sniff_proc_class.h b/sniff_proc_class.h index e781483b7..57a68671a 100644 --- a/sniff_proc_class.h +++ b/sniff_proc_class.h @@ -264,6 +264,11 @@ class PreProcessPacket { ppt_pp_callx, ppt_pp_callfindx }; + enum eCallX_state { + callx_na, + callx_process, + callx_find + }; struct batch_packet_s { batch_packet_s(unsigned max_count) { count = 0; diff --git a/voipmonitor.cpp b/voipmonitor.cpp index a6f23c2df..7fc6dcc4a 100644 --- a/voipmonitor.cpp +++ b/voipmonitor.cpp @@ -818,6 +818,9 @@ int opt_pcapdump = 0; int opt_callend = 1; //if true, cdr.called is saved bool opt_disable_cdr_indexes_rtp; int opt_t2_boost = false; +int opt_t2_boost_call_find_threads = false; +int opt_t2_boost_call_threads = 3; +int opt_storing_cdr_max_next_threads = 3; char opt_spooldir_main[1024]; char opt_spooldir_rtp[1024]; char opt_spooldir_graph[1024]; @@ -838,16 +841,20 @@ char opt_cachedir[1024]; int opt_upgrade_try_http_if_https_fail = 0; -#define MAXIMUM_STORING_CDR_THREADS 3 pthread_t storing_cdr_thread; // ID of worker storing CDR thread int storing_cdr_tid; pstat_data storing_cdr_thread_pstat_data[2]; -pthread_t storing_cdr_next_threads[MAXIMUM_STORING_CDR_THREADS]; // ID of worker storing CDR next threads -int storing_cdr_next_tid[MAXIMUM_STORING_CDR_THREADS]; // ID of worker storing CDR next threads -pstat_data storing_cdr_next_threads_pstat_data[MAXIMUM_STORING_CDR_THREADS][2]; -sem_t storing_cdr_next_threads_sem[MAXIMUM_STORING_CDR_THREADS][2]; -bool storing_cdr_next_threads_init[MAXIMUM_STORING_CDR_THREADS]; -list *storing_cdr_next_threads_calls[MAXIMUM_STORING_CDR_THREADS]; +struct sStoringCdrNextThreads { + sStoringCdrNextThreads() { + memset(this, 0, sizeof(*this)); + } + pthread_t thread; + int tid; + pstat_data pstat[2]; + sem_t sem[2]; + bool init; + list *calls; +} *storing_cdr_next_threads; volatile int storing_cdr_next_threads_count; volatile int storing_cdr_next_threads_count_mod; volatile int storing_cdr_next_threads_count_mod_request; @@ -927,10 +934,12 @@ PcapQueue_readFromInterface *pcapQueueInterface; PcapQueue *pcapQueueStatInterface; PreProcessPacket *preProcessPacket[PreProcessPacket::ppt_end_base]; -PreProcessPacket *preProcessPacketCallX[preProcessPacketCallX_count + 1]; -PreProcessPacket *preProcessPacketCallFindX[preProcessPacketCallX_count]; +PreProcessPacket **preProcessPacketCallX; +PreProcessPacket **preProcessPacketCallFindX; +int preProcessPacketCallX_count; ProcessRtpPacket *processRtpPacketHash; ProcessRtpPacket *processRtpPacketDistribute[MAX_PROCESS_RTP_PACKET_THREADS]; +volatile PreProcessPacket::eCallX_state preProcessPacketCallX_state = PreProcessPacket::callx_na; TcpReassembly *tcpReassemblyHttp; TcpReassembly *tcpReassemblyWebrtc; @@ -1814,23 +1823,23 @@ void *storing_cdr( void */*dummy*/ ) { while(__sync_lock_test_and_set(&storing_cdr_next_threads_count_sync, 1)); storing_cdr_next_threads_count_mod = storing_cdr_next_threads_count_mod_request; storing_cdr_next_threads_count_mod_request = 0; - if((storing_cdr_next_threads_count_mod > 0 && storing_cdr_next_threads_count == MAXIMUM_STORING_CDR_THREADS) || + if((storing_cdr_next_threads_count_mod > 0 && storing_cdr_next_threads_count == opt_storing_cdr_max_next_threads) || (storing_cdr_next_threads_count_mod < 0 && storing_cdr_next_threads_count == 0)) { storing_cdr_next_threads_count_mod = 0; } if(storing_cdr_next_threads_count_mod > 0) { syslog(LOG_NOTICE, "storing cdr - creating next thread %i", storing_cdr_next_threads_count + 1); - if(!storing_cdr_next_threads_init[storing_cdr_next_threads_count]) { - storing_cdr_next_threads_calls[storing_cdr_next_threads_count] = new FILE_LINE(0) list; + if(!storing_cdr_next_threads[storing_cdr_next_threads_count].init) { + storing_cdr_next_threads[storing_cdr_next_threads_count].calls = new FILE_LINE(0) list; for(int i = 0; i < 2; i++) { - sem_init(&storing_cdr_next_threads_sem[storing_cdr_next_threads_count][i], 0, 0); + sem_init(&storing_cdr_next_threads[storing_cdr_next_threads_count].sem[i], 0, 0); } - storing_cdr_next_threads_init[storing_cdr_next_threads_count] = true; + storing_cdr_next_threads[storing_cdr_next_threads_count].init = true; } - memset(storing_cdr_next_threads_pstat_data[storing_cdr_next_threads_count], 0, sizeof(storing_cdr_next_threads_pstat_data[storing_cdr_next_threads_count])); + memset(storing_cdr_next_threads[storing_cdr_next_threads_count].pstat, 0, sizeof(storing_cdr_next_threads[storing_cdr_next_threads_count].pstat)); void *storing_cdr_next_thread( void *_indexNextThread ); vm_pthread_create(("storing cdr - next thread " + intToString(storing_cdr_next_threads_count + 1)).c_str(), - &storing_cdr_next_threads[storing_cdr_next_threads_count], NULL, storing_cdr_next_thread, (void*)(long)(storing_cdr_next_threads_count), __FILE__, __LINE__); + &storing_cdr_next_threads[storing_cdr_next_threads_count].thread, NULL, storing_cdr_next_thread, (void*)(long)(storing_cdr_next_threads_count), __FILE__, __LINE__); while(storing_cdr_next_threads_count_mod > 0) { USLEEP(100000); } @@ -1855,7 +1864,7 @@ void *storing_cdr( void */*dummy*/ ) { if(!mod) { calls_for_store.push_back(call); } else { - storing_cdr_next_threads_calls[mod - 1]->push_back(call); + storing_cdr_next_threads[mod - 1].calls->push_back(call); } } else { calls_for_store.push_back(call); @@ -1874,7 +1883,7 @@ void *storing_cdr( void */*dummy*/ ) { if(calls_for_store_count || storing_cdr_next_threads_count_mod < 0) { if(storing_cdr_next_threads_count) { for(int i = 0; i < storing_cdr_next_threads_count; i++) { - sem_post(&storing_cdr_next_threads_sem[i][0]); + sem_post(&storing_cdr_next_threads[i].sem[0]); } } bool useConvertToWav = false; @@ -1959,7 +1968,7 @@ void *storing_cdr( void */*dummy*/ ) { delete [] indikConvertToWav; if(storing_cdr_next_threads_count) { for(int i = 0; i < storing_cdr_next_threads_count; i++) { - sem_wait(&storing_cdr_next_threads_sem[i][1]); + sem_wait(&storing_cdr_next_threads[i].sem[1]); } } if(storing_cdr_next_threads_count_mod < 0) { @@ -2021,22 +2030,22 @@ void *storing_cdr( void */*dummy*/ ) { void *storing_cdr_next_thread( void *_indexNextThread ) { int indexNextThread = (int)(long)_indexNextThread; - storing_cdr_next_tid[indexNextThread] = get_unix_tid(); + storing_cdr_next_threads[indexNextThread].tid = get_unix_tid(); if(storing_cdr_next_threads_count_mod > 0 && indexNextThread == storing_cdr_next_threads_count) { storing_cdr_next_threads_count_mod = 0; } while(terminating_storing_cdr < 2) { - sem_wait(&storing_cdr_next_threads_sem[indexNextThread][0]); + sem_wait(&storing_cdr_next_threads[indexNextThread].sem[0]); if(terminating_storing_cdr == 2) { break; } bool useConvertToWav = false; - unsigned indikConvertToWavSize = storing_cdr_next_threads_calls[indexNextThread]->size(); + unsigned indikConvertToWavSize = storing_cdr_next_threads[indexNextThread].calls->size(); char *indikConvertToWav = new FILE_LINE(0) char[indikConvertToWavSize]; memset(indikConvertToWav, 0, indikConvertToWavSize); unsigned counter = 0; - for(list::iterator iter_call = storing_cdr_next_threads_calls[indexNextThread]->begin(); iter_call != storing_cdr_next_threads_calls[indexNextThread]->end(); iter_call++) { + for(list::iterator iter_call = storing_cdr_next_threads[indexNextThread].calls->begin(); iter_call != storing_cdr_next_threads[indexNextThread].calls->end(); iter_call++) { Call *call = *iter_call; bool needConvertToWavInThread = false; call->closeRawFiles(); @@ -2074,7 +2083,7 @@ void *storing_cdr_next_thread( void *_indexNextThread ) { } list calls_for_delete; counter = 0; - for(list::iterator iter_call = storing_cdr_next_threads_calls[indexNextThread]->begin(); iter_call != storing_cdr_next_threads_calls[indexNextThread]->end(); iter_call++) { + for(list::iterator iter_call = storing_cdr_next_threads[indexNextThread].calls->begin(); iter_call != storing_cdr_next_threads[indexNextThread].calls->end(); iter_call++) { if(useConvertToWav && counter < indikConvertToWavSize && indikConvertToWav[counter]) { calltable->audio_queue.push_back(*iter_call); calltable->processCallsInAudioQueue(false); @@ -2106,13 +2115,13 @@ void *storing_cdr_next_thread( void *_indexNextThread ) { calltable->unlock_calls_deletequeue(); } delete [] indikConvertToWav; - storing_cdr_next_threads_calls[indexNextThread]->clear(); + storing_cdr_next_threads[indexNextThread].calls->clear(); bool stop = false; if(storing_cdr_next_threads_count_mod < 0 && (indexNextThread + 1) == storing_cdr_next_threads_count) { stop = true; } - sem_post(&storing_cdr_next_threads_sem[indexNextThread][1]); + sem_post(&storing_cdr_next_threads[indexNextThread].sem[1]); if(stop) { syslog(LOG_NOTICE, "storing cdr - stop next thread %i", indexNextThread + 1); break; @@ -2123,7 +2132,7 @@ void *storing_cdr_next_thread( void *_indexNextThread ) { void storing_cdr_next_thread_add() { if(getTimeS() > storing_cdr_next_threads_count_last_change + 120) { - if(storing_cdr_next_threads_count < MAXIMUM_STORING_CDR_THREADS && + if(storing_cdr_next_threads_count < opt_storing_cdr_max_next_threads && storing_cdr_next_threads_count_mod == 0 && storing_cdr_next_threads_count_mod_request == 0) { storing_cdr_next_threads_count_mod_request = 1; @@ -2155,7 +2164,7 @@ string storing_cdr_getCpuUsagePerc(double *avg) { ++cpu_count; } for(int i = 0; i < storing_cdr_next_threads_count; i++) { - double cpu = get_cpu_usage_perc(storing_cdr_next_tid[i], storing_cdr_next_threads_pstat_data[i]); + double cpu = get_cpu_usage_perc(storing_cdr_next_threads[i].tid, storing_cdr_next_threads[i].pstat); if(cpu > 0) { cpuStr << '/' << setprecision(1) << cpu; cpu_sum += cpu; @@ -3907,6 +3916,9 @@ int main_init_read() { chartsCacheInit(sqlDbInit); } + if(opt_t2_boost && opt_t2_boost_call_threads > 0) { + preProcessPacketCallX_count = opt_t2_boost_call_threads; + } calltable = new FILE_LINE(42013) Calltable(sqlDbInit); // if the system has more than one CPU enable threading @@ -4130,6 +4142,9 @@ int main_init_read() { // start thread processing queued cdr and sql queue - supressed if run as sender if(!is_sender() && !is_client_packetbuffer_sender()) { + if(opt_storing_cdr_max_next_threads) { + storing_cdr_next_threads = new FILE_LINE(0) sStoringCdrNextThreads[opt_storing_cdr_max_next_threads]; + } vm_pthread_create("storing cdr", &storing_cdr_thread, NULL, storing_cdr, NULL, __FILE__, __LINE__); vm_pthread_create("storing register", @@ -4188,11 +4203,13 @@ int main_init_read() { } } - if(opt_t2_boost) { + if(opt_t2_boost && opt_t2_boost_call_threads > 0) { + preProcessPacketCallX = new FILE_LINE(0) PreProcessPacket*[preProcessPacketCallX_count + 1]; for(int i = 0; i < preProcessPacketCallX_count + 1; i++) { preProcessPacketCallX[i] = new FILE_LINE(0) PreProcessPacket(PreProcessPacket::PreProcessPacket::ppt_pp_callx, i); } - if(opt_t2_boost == 2) { + if(calltable->enableCallFindX()) { + preProcessPacketCallFindX = new FILE_LINE(0) PreProcessPacket*[preProcessPacketCallX_count]; for(int i = 0; i < preProcessPacketCallX_count; i++) { preProcessPacketCallFindX[i] = new FILE_LINE(0) PreProcessPacket(PreProcessPacket::PreProcessPacket::ppt_pp_callfindx, i); } @@ -4202,6 +4219,7 @@ int main_init_read() { for(int i = 0; i < preProcessPacketCallX_count; i++) { preProcessPacketCallFindX[i]->startOutThread(); } + preProcessPacketCallX_state = PreProcessPacket::callx_find; } } @@ -4523,23 +4541,27 @@ void terminate_processpacket() { } for(int termPass = 0; termPass < 2; termPass++) { - for(int i = 0; i < preProcessPacketCallX_count + 1; i++) { - if(preProcessPacketCallX[i]) { - if(termPass == 0) { - preProcessPacketCallX[i]->terminate(); - } else { - delete preProcessPacketCallX[i]; - preProcessPacketCallX[i] = NULL; + if(preProcessPacketCallX) { + for(int i = 0; i < preProcessPacketCallX_count + 1; i++) { + if(preProcessPacketCallX[i]) { + if(termPass == 0) { + preProcessPacketCallX[i]->terminate(); + } else { + delete preProcessPacketCallX[i]; + preProcessPacketCallX[i] = NULL; + } } } } - for(int i = 0; i < preProcessPacketCallX_count; i++) { - if(preProcessPacketCallFindX[i]) { - if(termPass == 0) { - preProcessPacketCallFindX[i]->terminate(); - } else { - delete preProcessPacketCallFindX[i]; - preProcessPacketCallFindX[i] = NULL; + if(preProcessPacketCallFindX) { + for(int i = 0; i < preProcessPacketCallX_count; i++) { + if(preProcessPacketCallFindX[i]) { + if(termPass == 0) { + preProcessPacketCallFindX[i]->terminate(); + } else { + delete preProcessPacketCallFindX[i]; + preProcessPacketCallFindX[i] = NULL; + } } } } @@ -4555,6 +4577,17 @@ void terminate_processpacket() { } if(termPass == 0) { USLEEP(100000); + } else { + if(preProcessPacketCallX) { + delete [] preProcessPacketCallX; + preProcessPacketCallX = NULL; + } + if(preProcessPacketCallFindX) { + delete [] preProcessPacketCallFindX; + preProcessPacketCallFindX = NULL; + } + preProcessPacketCallX_count = 0; + preProcessPacketCallX_state = PreProcessPacket::callx_na; } } @@ -4672,16 +4705,25 @@ void main_term_read() { if(storing_cdr_thread) { terminating_storing_cdr = 1; pthread_join(storing_cdr_thread, NULL); - while(__sync_lock_test_and_set(&storing_cdr_next_threads_count_sync, 1)); - for(int i = 0; i < storing_cdr_next_threads_count; i++) { - sem_post(&storing_cdr_next_threads_sem[i][0]); - pthread_join(storing_cdr_next_threads[i], NULL); - for(int j = 0; j < 2; j++) { - sem_destroy(&storing_cdr_next_threads_sem[i][j]); + if(storing_cdr_next_threads) { + while(__sync_lock_test_and_set(&storing_cdr_next_threads_count_sync, 1)); + for(int i = 0; i < opt_storing_cdr_max_next_threads; i++) { + if(storing_cdr_next_threads[i].init) { + if(i < storing_cdr_next_threads_count) { + sem_post(&storing_cdr_next_threads[i].sem[0]); + pthread_join(storing_cdr_next_threads[i].thread, NULL); + } + for(int j = 0; j < 2; j++) { + sem_destroy(&storing_cdr_next_threads[i].sem[j]); + } + delete storing_cdr_next_threads[i].calls; + } } - delete storing_cdr_next_threads_calls[i]; + __sync_lock_release(&storing_cdr_next_threads_count_sync); + delete [] storing_cdr_next_threads; + storing_cdr_next_threads = NULL; } - __sync_lock_release(&storing_cdr_next_threads_count_sync); + storing_cdr_thread = 0; } if(storing_registers_thread) { terminating_storing_registers = 1; @@ -6446,8 +6488,10 @@ void cConfig::addConfigItems() { addConfigItem(new FILE_LINE(0) cConfigItem_string("mysqlcompress_type", opt_mysqlcompress_type, sizeof(opt_mysqlcompress_type))); addConfigItem(new FILE_LINE(42089) cConfigItem_yesno("sqlcallend", &opt_callend)); addConfigItem(new FILE_LINE(0) cConfigItem_yesno("disable_cdr_indexes_rtp", &opt_disable_cdr_indexes_rtp)); - addConfigItem((new FILE_LINE(42090) cConfigItem_yesno("t2_boost", &opt_t2_boost)) - ->addValues("extend:2")); + addConfigItem(new FILE_LINE(42090) cConfigItem_yesno("t2_boost", &opt_t2_boost)); + addConfigItem(new FILE_LINE(0) cConfigItem_yesno("t2_boost_enable_call_find_threads", &opt_t2_boost_call_find_threads)); + addConfigItem(new FILE_LINE(0) cConfigItem_integer("t2_boost_max_next_call_threads", &opt_t2_boost_call_threads)); + addConfigItem(new FILE_LINE(0) cConfigItem_integer("storing_cdr_max_next_threads", &opt_storing_cdr_max_next_threads)); subgroup("partitions"); addConfigItem(new FILE_LINE(42091) cConfigItem_yesno("disable_partition_operations", &opt_disable_partition_operations)); addConfigItem(new FILE_LINE(0) cConfigItem_hour_interval("partition_operations_enable_fromto", &opt_partition_operations_enable_run_hour_from, &opt_partition_operations_enable_run_hour_to)); @@ -7803,6 +7847,7 @@ void parse_verb_param(string verbParam) { strcpy_null_term(sverb.sipcallerip_filter, verbParam.c_str() + 19); else if(verbParam.substr(0, 19) == "sipcalledip_filter=") strcpy_null_term(sverb.sipcalledip_filter, verbParam.c_str() + 19); + else if(verbParam == "suppress_cdr_insert") sverb.suppress_cdr_insert = 1; else if(verbParam == "suppress_server_store") sverb.suppress_server_store = 1; else if(verbParam == "suppress_fork") sverb.suppress_fork = 1; else if(verbParam.substr(0, 11) == "trace_call=") @@ -10372,8 +10417,16 @@ int eval_config(string inistr) { opt_disable_cdr_indexes_rtp = yesno(value); } if((value = ini.GetValue("general", "t2_boost", NULL))) { - opt_t2_boost = !strcmp(value, "extend") ? 2 : - yesno(value); + opt_t2_boost = yesno(value); + } + if((value = ini.GetValue("general", "t2_boost_enable_call_find_threads", NULL))) { + opt_t2_boost_call_find_threads = yesno(value); + } + if((value = ini.GetValue("general", "t2_boost_max_next_call_threads", NULL))) { + opt_t2_boost_call_threads = atoi(value); + } + if((value = ini.GetValue("general", "storing_cdr_max_next_threads", NULL))) { + opt_storing_cdr_max_next_threads = atoi(value); } if((value = ini.GetValue("general", "destination_number_mode", NULL))) { opt_destination_number_mode = atoi(value); diff --git a/voipmonitor_define.h b/voipmonitor_define.h index 773fead0c..b4112cae2 100644 --- a/voipmonitor_define.h +++ b/voipmonitor_define.h @@ -66,7 +66,5 @@ #define CAST_OBJ_TO_VOID(obj) ((void*)(obj)) -#define preProcessPacketCallX_count 3 - #endif //VOIPMONITOR_DEFINE_H