Skip to content

Commit

Permalink
cloud router - debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
rbucek committed May 21, 2024
1 parent 177891b commit 5f71876
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 107 deletions.
156 changes: 55 additions & 101 deletions sql_db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,84 +581,24 @@ void SqlDb::setCsvInRemoteResult(bool useCsvInRemoteResult) {
this->useCsvInRemoteResult = useCsvInRemoteResult;
}

bool SqlDb::queryByCurl(string query, bool callFromStoreProcessWithFixDeadlock) {
clearLastError();
bool ok = false;
unsigned int attempt = 0;
unsigned int send_query_counter = 0;
for(unsigned int pass = 0; pass < this->maxQueryPass; pass++, attempt++) {
string preparedQuery = this->prepareQuery(query, !callFromStoreProcessWithFixDeadlock && send_query_counter > 1);
if(pass > 0) {
sleep(min(1 + pass * 2, 60u));
syslog(LOG_INFO, "next attempt %u - query: %s", attempt, prepareQueryForPrintf(preparedQuery).substr(0, 100).c_str());
}
SimpleBuffer responseBuffer;
s_get_curl_response_params curl_params(s_get_curl_response_params::_rt_post);
curl_params.addParam("query", preparedQuery.c_str());
curl_params.addParam("token", cloud_token.c_str());
get_curl_response(cloud_redirect.empty() ? cloud_host.c_str() : cloud_redirect.c_str(),
&responseBuffer, &curl_params);
if(!curl_params.error.empty()) {
setLastError(0, curl_params.error.c_str(), true);
continue;
}
if(responseBuffer.empty()) {
setLastError(0, "response is empty", true);
continue;
}
if(!responseBuffer.isJsonObject()) {
setLastError(0, "bad response - " + string(responseBuffer), true);
continue;
}
JsonItem jsonData;
jsonData.parse((char*)responseBuffer);
string result = jsonData.getValue("result");
trim(result);
if(!strncasecmp(result.c_str(), "REDIRECT TO", 11)) {
cloud_redirect = result.substr(11);
trim(cloud_redirect);
if(cloud_redirect.empty()) {
setLastError(0, "missing redirect ip / server", true);
} else {
pass = 0;
continue;
}
}
int rsltProcessResponse = processResponseFromQueryBy(responseBuffer, query.c_str(), pass);
send_query_counter++;
if(rsltProcessResponse == 1) {
ok = true;
break;
} else if(rsltProcessResponse == -1) {
break;
} else {
if(ignoreLastError() ||
(callFromStoreProcessWithFixDeadlock && getLastError() == ER_LOCK_DEADLOCK)) {
break;
}
}
}
return(ok);
}

bool SqlDb::queryByRemoteSocket(string query, bool callFromStoreProcessWithFixDeadlock, const char *dropProcQuery) {
clearLastError();
bool ok = false;
unsigned int attempt = 0;
unsigned int send_query_counter = 0;
u_int64_t startTimeMS = getTimeMS();
for(unsigned int pass = 0; pass < this->maxQueryPass; pass++, attempt++) {
if((is_terminating() > 1 && attempt > 2) ||
for(unsigned int pass = 0; pass < this->maxQueryPass; attempt++) {
if(is_terminating() > 1 ||
(isCloud() && is_read_from_file_simple() && getTimeMS() > startTimeMS + 5 * 1000)) {
break;
}
string preparedQuery = this->prepareQuery(query, !callFromStoreProcessWithFixDeadlock && send_query_counter > 1);
if(pass > 0) {
if(attempt > 0) {
if(this->remote_socket) {
delete this->remote_socket;
this->remote_socket = NULL;
}
sleep(min(1 + pass * 2, 60u));
sleep(min(1 + attempt * 2, 60u));
syslog(LOG_INFO, "next attempt %u - query: %s", attempt, prepareQueryForPrintf(preparedQuery.c_str()).substr(0, 100).c_str());
} else if(this->remote_socket && this->remote_socket->getLastTimeOkRead() && getTimeUS() > this->remote_socket->getLastTimeOkRead() + 10 * 1000000ull) {
if(!this->remote_socket->checkHandleRead()) {
Expand Down Expand Up @@ -728,39 +668,54 @@ bool SqlDb::queryByRemoteSocket(string query, bool callFromStoreProcessWithFixDe
if(this->useCsvInRemoteResult) {
preparedQuery = "CSV:" + preparedQuery;
}
int rsltProcessResponse = _queryByRemoteSocket(preparedQuery, pass);
eQueryByRemoteSocketRslt rsltProcessResponse = _queryByRemoteSocket(preparedQuery);
send_query_counter++;
if(rsltProcessResponse == 1) {
bool stop = false;
switch(rsltProcessResponse) {
case _qbrs_ok:
ok = true;
stop = true;
++pass;
break;
} else if(rsltProcessResponse == -1) {
break;
} else {
case _qbrs_mysql_error:
if(ignoreLastError() ||
(callFromStoreProcessWithFixDeadlock && getLastError() == ER_LOCK_DEADLOCK)) {
break;
}
if(this->getLastError() == ER_SP_ALREADY_EXISTS && pass >= 2) {
if(_queryByRemoteSocket("repair table mysql.proc", 0) == 1) {
stop = true;
} else if(this->getLastError() == ER_SP_ALREADY_EXISTS && pass >= 2) {
if(_queryByRemoteSocket("repair table mysql.proc") == 1) {
syslog(LOG_NOTICE, "success call 'repair table mysql.proc'");
} else {
syslog(LOG_NOTICE, "failed call 'repair table mysql.proc' with error: %s", this->getLastErrorString().c_str());
}
if(dropProcQuery) {
if(_queryByRemoteSocket(dropProcQuery, 0) == 1) {
if(_queryByRemoteSocket(dropProcQuery) == 1) {
syslog(LOG_NOTICE, "success call '%s'", dropProcQuery);
} else {
syslog(LOG_NOTICE, "failed call '%s' with error: %s", dropProcQuery, this->getLastErrorString().c_str());
}
}
}
++pass;
break;
case _qbrs_mysql_error_disable_next_attempt:
stop = true;
++pass;
break;
case _qbrs_failed_connect:
break;
case _qbrs_na:
++pass;
break;
}
if(stop) {
break;
}
}
this->useCsvInRemoteResult = false;
return(ok);
}

int SqlDb::_queryByRemoteSocket(string query, unsigned int pass) {
SqlDb::eQueryByRemoteSocketRslt SqlDb::_queryByRemoteSocket(string query) {
bool okSendQuery = true;
if(query.length() > 100) {
cGzip gzipCompressQuery;
Expand All @@ -779,39 +734,39 @@ int SqlDb::_queryByRemoteSocket(string query, unsigned int pass) {
}
if(!okSendQuery) {
setLastError(0, "failed send query", true);
return(0);
return(_qbrs_failed_connect);
}
u_char *queryResponse;
size_t queryResponseLength;
queryResponse = this->remote_socket->readBlock(&queryResponseLength, cSocket::_te_aes);
if(!queryResponse) {
setLastError(0, "failed read query response", true);
return(0);
return(_qbrs_failed_connect);
}
string queryResponseStr;
cGzip gzipDecompressResponse;
if(gzipDecompressResponse.isCompress(queryResponse, queryResponseLength)) {
queryResponseStr = gzipDecompressResponse.decompressString(queryResponse, queryResponseLength);
if(queryResponseStr.empty()) {
setLastError(0, "response is invalid (gunzip failed)", true);
return(0);
return(_qbrs_failed_connect);
}
} else {
queryResponseStr = string((char*)queryResponse, queryResponseLength);
}
if(queryResponseStr.empty()) {
setLastError(0, "response is empty", true);
return(0);
return(_qbrs_failed_connect);
}
if(isJsonObject(queryResponseStr)) {
return(processResponseFromQueryBy(queryResponseStr.c_str(), query.c_str(), pass));
return(processResponseFromQueryBy(queryResponseStr.c_str(), query.c_str()));
} else if(queryResponseStr.substr(0, 3) == "CSV") {
return(processResponseFromCsv(queryResponseStr.c_str()));
}
return(0);
return(_qbrs_na);
}

int SqlDb::processResponseFromQueryBy(const char *response, const char *query, unsigned pass) {
SqlDb::eQueryByRemoteSocketRslt SqlDb::processResponseFromQueryBy(const char *response, const char *query) {
response_data_columns.clear();
response_data_columns_types.clear();
response_data.clear();
Expand Down Expand Up @@ -851,16 +806,15 @@ int SqlDb::processResponseFromQueryBy(const char *response, const char *query, u
setLastError(errorCode, errorString.c_str(), true);
}
if(tryNext) {
if(sql_noerror || sql_disable_next_attempt_if_error ||
this->disableLogError || this->disableNextAttemptIfError ||
if(sql_disable_next_attempt_if_error ||
this->disableNextAttemptIfError ||
errorCode == ER_PARSE_ERROR) {
return(-1);
} else if(errorCode != CR_SERVER_GONE_ERROR &&
pass < this->maxQueryPass - 5) {
pass = this->maxQueryPass - 5;
return(_qbrs_mysql_error_disable_next_attempt);
} else {
return(_qbrs_mysql_error);
}
} else {
return(-1);
return(_qbrs_mysql_error_disable_next_attempt);
}
}
}
Expand Down Expand Up @@ -896,12 +850,12 @@ int SqlDb::processResponseFromQueryBy(const char *response, const char *query, u
}
}
}
return(1);
return(_qbrs_ok);
}
return(0);
return(_qbrs_na);
}

int SqlDb::processResponseFromCsv(const char *response) {
SqlDb::eQueryByRemoteSocketRslt SqlDb::processResponseFromCsv(const char *response) {
response_data_columns.clear();
response_data_columns_types.clear();
response_data.clear();
Expand Down Expand Up @@ -961,7 +915,7 @@ int SqlDb::processResponseFromCsv(const char *response) {
}
}
response_data_rows = row_counter > 1 ? row_counter - 1 : 0;
return(1);
return(_qbrs_ok);
}

string SqlDb::prepareQuery(string query, bool nextPass) {
Expand Down Expand Up @@ -2338,8 +2292,8 @@ bool SqlDb_mysql::query(string query, bool callFromStoreProcessWithFixDeadlock,
if(pass < this->maxQueryPass - 1) {
this->reconnect();
}
} else if(sql_noerror || sql_disable_next_attempt_if_error ||
this->disableLogError || this->disableNextAttemptIfError ||
} else if(sql_disable_next_attempt_if_error ||
this->disableNextAttemptIfError ||
this->ignoreLastError() ||
(callFromStoreProcessWithFixDeadlock && this->getLastError() == ER_LOCK_DEADLOCK)) {
break;
Expand Down Expand Up @@ -2374,8 +2328,8 @@ bool SqlDb_mysql::query(string query, bool callFromStoreProcessWithFixDeadlock,
}
}
}
if(!opt_load_query_main_from_files && pass < this->maxQueryPass - 5) {
pass = this->maxQueryPass - 5;
if(is_read_from_file() && pass < this->maxQueryPass - 10) {
pass = this->maxQueryPass - 10;
}
if(pass < this->maxQueryPass - 1) {
this->reconnect();
Expand Down Expand Up @@ -3202,17 +3156,17 @@ bool SqlDb_odbc::query(string query, bool /*callFromStoreProcessWithFixDeadlock*
if(!sql_noerror && !this->disableLogError) {
this->checkLastError("odbc query error", true);
}
if(sql_noerror || sql_disable_next_attempt_if_error ||
this->disableLogError || this->disableNextAttemptIfError) {
if(sql_disable_next_attempt_if_error ||
this->disableNextAttemptIfError) {
break;
}
else if(rslt == SQL_ERROR || rslt == SQL_INVALID_HANDLE) {
if(pass < this->maxQueryPass - 1) {
this->reconnect();
}
} else {
if(pass < this->maxQueryPass - 5) {
pass = this->maxQueryPass - 5;
if(is_read_from_file() && pass < this->maxQueryPass - 10) {
pass = this->maxQueryPass - 10;
}
if(pass < this->maxQueryPass - 1) {
this->reconnect();
Expand Down
14 changes: 10 additions & 4 deletions sql_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,13 @@ class SqlDb {
_tp_hour,
_tp_month
};
enum eQueryByRemoteSocketRslt {
_qbrs_na,
_qbrs_ok,
_qbrs_mysql_error,
_qbrs_mysql_error_disable_next_attempt,
_qbrs_failed_connect
};
struct sPartition {
sPartition() {
file_size = -1;
Expand Down Expand Up @@ -380,11 +387,10 @@ class SqlDb {
bool reconnect();
void setCsvInRemoteResult(bool useCsvInRemoteResult = true);
virtual bool query(string query, bool callFromStoreProcessWithFixDeadlock = false, const char *dropProcQuery = NULL) = 0;
bool queryByCurl(string query, bool callFromStoreProcessWithFixDeadlock = false);
bool queryByRemoteSocket(string query, bool callFromStoreProcessWithFixDeadlock = false, const char *dropProcQuery = NULL);
int _queryByRemoteSocket(string query, unsigned int pass);
int processResponseFromQueryBy(const char *response, const char *query, unsigned pass);
int processResponseFromCsv(const char *response);
eQueryByRemoteSocketRslt _queryByRemoteSocket(string query);
eQueryByRemoteSocketRslt processResponseFromQueryBy(const char *response, const char *query);
eQueryByRemoteSocketRslt processResponseFromCsv(const char *response);
virtual string prepareQuery(string query, bool nextPass);
virtual SqlDb_row fetchRow() = 0;
string fetchValue(int indexField = 0);
Expand Down
6 changes: 6 additions & 0 deletions sql_db_global.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ u_int64_t cSqlDbAutoIncrement::get_last_id(const char *table, const char *idColu
++tryCounter;
if(sqlDb->query(string("select ") + idColumn + " from " + sqlDb->escapeTableName(table) + " order by id desc limit 1")) {
break;
} else if(sqlDb->getErrorCode() == ER_ACCESS_DENIED_ERROR ||
sqlDb->getErrorCode() == ER_NO_SUCH_TABLE) {
break;
}
}
#endif
Expand Down Expand Up @@ -456,6 +459,9 @@ void cSqlDbCodebook::_load(map<string, unsigned> *data, bool *overflow, SqlDb *s
++tryCounter;
if(sqlDb->query("select * from " + table + (!condStr.empty() ? " where " + condStr : ""))) {
break;
} else if(sqlDb->getErrorCode() == ER_ACCESS_DENIED_ERROR ||
sqlDb->getErrorCode() == ER_NO_SUCH_TABLE) {
break;
}
}
vector<map<string, string_null> > *rows = sqlDb->get_rslt();
Expand Down
2 changes: 1 addition & 1 deletion sql_db_global.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class cSqlDbData {
void initCodebooks(bool loadAll, unsigned limitTableRows, SqlDb *sqlDb);
void initAutoIncrement(SqlDb *sqlDb);
void lock_data() {
__SYNC_LOCK(_sync_data);
__SYNC_LOCK_USLEEP(_sync_data, 10);
}
void unlock_data() {
__SYNC_UNLOCK(_sync_data);
Expand Down
2 changes: 1 addition & 1 deletion tests_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,7 @@ void test() {
#else
string rslt = sqlDb->getJsonResult();
cout << rslt << endl;
sqlDb->processResponseFromQueryBy(rslt.c_str(), NULL, 0);
sqlDb->processResponseFromQueryBy(rslt.c_str(), NULL);
#endif
sqlDb->setCloudParameters("c", "c", "c");

Expand Down
3 changes: 3 additions & 0 deletions voipmonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ extern bool opt_pcap_queues_mirror_require_confirmation;
extern bool opt_pcap_queues_mirror_use_checksum;
extern int opt_pcap_dispatch;
extern int sql_noerror;
extern int sql_disable_next_attempt_if_error;
int opt_cleandatabase_cdr = 0;
int opt_cleandatabase_cdr_rtp_energylevels = 0;
int opt_cleandatabase_ss7 = 0;
Expand Down Expand Up @@ -1916,8 +1917,10 @@ int SqlInitSchema(string *rsltConnectErrorString = NULL) {
if(connectOk > 0) {
if(isSqlDriver("mysql")) {
sql_noerror = 1;
sql_disable_next_attempt_if_error = 1;
sqlDb->query("repair table mysql.proc");
sql_noerror = 0;
sql_disable_next_attempt_if_error = 0;
}
sqlDb->checkDbMode();
if(!opt_database_backup) {
Expand Down

0 comments on commit 5f71876

Please sign in to comment.