From fd6447f6cb4bc26116efc5a015883bd58a5cdf12 Mon Sep 17 00:00:00 2001 From: TienHuyIoTLab Date: Fri, 3 Nov 2023 23:50:59 +0700 Subject: [PATCH] Imrovement - Running async tcp by Core0 - Disable watchdog handler async tcp task - _tcp_recved() should be called when it has processed the data --- src/AsyncTCP.cpp | 68 ++++++++++++++++++++++++++++++------------------ src/AsyncTCP.h | 10 +++---- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index b3c8075..15f1e64 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -32,9 +32,11 @@ extern "C"{ #include "esp_task_wdt.h" #define CONFIG_ASYNC_TCP_STACK 2*8192 -#define CONFIG_ASYNC_TCP_PRIORITY 3 -#define CONFIG_ASYNC_TCP_QUEUE_SIZE 128 +#define CONFIG_ASYNC_TCP_PRIORITY 10 +#define CONFIG_ASYNC_TCP_QUEUE_SIZE 64 +#define ASYNC_TCP_PRINTFLN(f_, ...) Serial.printf_P(PSTR(f_ "\r\n"), ##__VA_ARGS__) +#define ASYNC_TCP_TAG_CONSOLE(...) ASYNC_TCP_PRINTFLN("[AsyncTCP]" __VA_ARGS__) /* * TCP/IP Event Task * */ @@ -48,7 +50,7 @@ typedef struct { void *arg; union { struct { - void * pcb; + tcp_pcb * pcb; int8_t err; } connected; struct { @@ -387,7 +389,7 @@ static esp_err_t _tcp_output(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data*)&msg); @@ -407,7 +409,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data, if(!pcb){ return ERR_CONN; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; msg.write.data = data; @@ -420,7 +422,7 @@ static esp_err_t _tcp_write(tcp_pcb * pcb, int8_t closed_slot, const char* data, static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg){ tcp_api_call_t * msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if(msg->closed_slot == -1 || !_closed_slots[msg->closed_slot]) { + if(msg->closed_slot != -1 || !_closed_slots[msg->closed_slot]) { msg->err = 0; tcp_recved(msg->pcb, msg->received); } @@ -431,7 +433,7 @@ static esp_err_t _tcp_recved(tcp_pcb * pcb, int8_t closed_slot, size_t len) { if(!pcb){ return ERR_CONN; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; msg.received = len; @@ -452,7 +454,7 @@ static esp_err_t _tcp_close(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data*)&msg); @@ -472,7 +474,7 @@ static esp_err_t _tcp_abort(tcp_pcb * pcb, int8_t closed_slot) { if(!pcb){ return ERR_CONN; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data*)&msg); @@ -489,7 +491,7 @@ static esp_err_t _tcp_connect(tcp_pcb * pcb, int8_t closed_slot, ip_addr_t * add if(!pcb){ return ESP_FAIL; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; msg.connect.addr = addr; @@ -509,7 +511,7 @@ static esp_err_t _tcp_bind(tcp_pcb * pcb, ip_addr_t * addr, uint16_t port) { if(!pcb){ return ESP_FAIL; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = -1; msg.bind.addr = addr; @@ -529,7 +531,7 @@ static tcp_pcb * _tcp_listen_with_backlog(tcp_pcb * pcb, uint8_t backlog) { if(!pcb){ return NULL; } - static tcp_api_call_t msg; + tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = -1; msg.backlog = backlog?backlog:0xFF; @@ -569,13 +571,15 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) _pcb = pcb; _closed_slot = -1; if(_pcb){ - _allocate_closed_slot(); _rx_last_packet = millis(); tcp_arg(_pcb, this); tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); tcp_poll(_pcb, &_tcp_poll, 1); + if(!_allocate_closed_slot()) { + _close(); + } } } @@ -685,6 +689,11 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ return false; } + if(!_allocate_closed_slot()) { + log_e("failed to closed slot full"); + return false; + } + ip_addr_t addr; addr.type = IPADDR_TYPE_V4; addr.u_addr.ip4.addr = ip; @@ -700,7 +709,6 @@ bool AsyncClient::connect(IPAddress ip, uint16_t port){ tcp_recv(pcb, &_tcp_recv); tcp_sent(pcb, &_tcp_sent); tcp_poll(pcb, &_tcp_poll, 1); - //_tcp_connect(pcb, &addr, port,(tcp_connected_fn)&_s_connected); _tcp_connect(pcb, _closed_slot, &addr, port,(tcp_connected_fn)&_tcp_connected); return true; } @@ -797,7 +805,7 @@ void AsyncClient::ackPacket(struct pbuf * pb){ * */ int8_t AsyncClient::_close(){ - //ets_printf("X: 0x%08x\n", (uint32_t)this); + // ets_printf("X: 0x%08x\n", (uint32_t)this); int8_t err = ERR_OK; if(_pcb) { //log_i(""); @@ -811,6 +819,7 @@ int8_t AsyncClient::_close(){ if(err != ERR_OK) { err = abort(); } + _free_closed_slot(); _pcb = NULL; if(_discard_cb) { _discard_cb(_discard_cb_arg, this); @@ -819,7 +828,10 @@ int8_t AsyncClient::_close(){ return err; } -void AsyncClient::_allocate_closed_slot(){ +boolean AsyncClient::_allocate_closed_slot(){ + if (_closed_slot != -1) { + return true; + } xSemaphoreTake(_slots_lock, portMAX_DELAY); uint32_t closed_slot_min_index = 0; for (int i = 0; i < _number_of_closed_slots; ++ i) { @@ -832,28 +844,28 @@ void AsyncClient::_allocate_closed_slot(){ _closed_slots[_closed_slot] = 0; } xSemaphoreGive(_slots_lock); + return (_closed_slot != -1); } void AsyncClient::_free_closed_slot(){ + xSemaphoreTake(_slots_lock, portMAX_DELAY); if (_closed_slot != -1) { _closed_slots[_closed_slot] = _closed_index; _closed_slot = -1; ++ _closed_index; } + xSemaphoreGive(_slots_lock); } /* * Private Callbacks * */ -int8_t AsyncClient::_connected(void* pcb, int8_t err){ +int8_t AsyncClient::_connected(tcp_pcb* pcb, int8_t err){ _pcb = reinterpret_cast(pcb); if(_pcb){ _rx_last_packet = millis(); _pcb_busy = false; -// tcp_recv(_pcb, &_tcp_recv); -// tcp_sent(_pcb, &_tcp_sent); -// tcp_poll(_pcb, &_tcp_poll, 1); } if(_connect_cb) { _connect_cb(_connect_cb_arg, this); @@ -870,6 +882,7 @@ void AsyncClient::_error(int8_t err) { tcp_err(_pcb, NULL); tcp_poll(_pcb, NULL, 0); } + _free_closed_slot(); _pcb = NULL; } if(_error_cb) { @@ -921,13 +934,19 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { } int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { - while(pb != NULL) { + if(!_pcb || pcb != _pcb){ + log_e("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb); + return ERR_OK; + } + size_t total = 0; + while((pb != NULL) && (ERR_OK == err)) { _rx_last_packet = millis(); //we should not ack before we assimilate the data _ack_pcb = true; pbuf *b = pb; pb = b->next; b->next = NULL; + total += b->len; if(_pb_cb){ _pb_cb(_pb_cb_arg, this, b); } else { @@ -936,13 +955,12 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { } if(!_ack_pcb) { _rx_ack_len += b->len; - } else if(_pcb) { - _tcp_recved(_pcb, _closed_slot, b->len); } } pbuf_free(b); } - return ERR_OK; + err = _tcp_recved(pcb, _closed_slot, total); + return err; } int8_t AsyncClient::_poll(tcp_pcb* pcb){ @@ -1242,7 +1260,7 @@ void AsyncClient::_s_error(void * arg, int8_t err) { reinterpret_cast(arg)->_error(err); } -int8_t AsyncClient::_s_connected(void * arg, void * pcb, int8_t err){ +int8_t AsyncClient::_s_connected(void * arg, struct tcp_pcb * pcb, int8_t err){ return reinterpret_cast(arg)->_connected(pcb, err); } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 8d2f6ce..30a9c5a 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -33,8 +33,8 @@ extern "C" { //If core is not defined, then we are running in Arduino or PIO #ifndef CONFIG_ASYNC_TCP_RUNNING_CORE -#define CONFIG_ASYNC_TCP_RUNNING_CORE -1 //any available core -#define CONFIG_ASYNC_TCP_USE_WDT 1 //if enabled, adds between 33us and 200us per event +#define CONFIG_ASYNC_TCP_RUNNING_CORE 0 //any available core +#define CONFIG_ASYNC_TCP_USE_WDT 0 //if enabled, adds between 33us and 200us per event #endif class AsyncClient; @@ -135,7 +135,7 @@ class AsyncClient { static int8_t _s_lwip_fin(void *arg, struct tcp_pcb *tpcb, int8_t err); static void _s_error(void *arg, int8_t err); static int8_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len); - static int8_t _s_connected(void* arg, void* tpcb, int8_t err); + static int8_t _s_connected(void* arg, struct tcp_pcb *tpcb, int8_t err); static void _s_dns_found(const char *name, struct ip_addr *ipaddr, void *arg); int8_t _recv(tcp_pcb* pcb, pbuf* pb, int8_t err); @@ -173,8 +173,8 @@ class AsyncClient { int8_t _close(); void _free_closed_slot(); - void _allocate_closed_slot(); - int8_t _connected(void* pcb, int8_t err); + boolean _allocate_closed_slot(); + int8_t _connected(tcp_pcb* pcb, int8_t err); void _error(int8_t err); int8_t _poll(tcp_pcb* pcb); int8_t _sent(tcp_pcb* pcb, uint16_t len);