diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 1a4e8bd..4571449 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -589,13 +589,15 @@ AsyncClient::AsyncClient(tcp_pcb* pcb) _pcb = pcb; _closed_slot = -1; if(_pcb){ - _allocate_closed_slot(); _rx_last_packet = millis(); tcp_arg(_pcb, this); tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); tcp_poll(_pcb, &_tcp_poll, 1); + if(!_allocate_closed_slot()) { + _close(); + } } } @@ -705,6 +707,11 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port){ return false; } + if(!_allocate_closed_slot()) { + log_e("failed to allocate: closed slot full"); + return false; + } + tcp_pcb* pcb = tcp_new_ip_type(IPADDR_TYPE_ANY); if (!pcb){ log_e("pcb == NULL"); @@ -860,6 +867,7 @@ int8_t AsyncClient::_close(){ if(err != ERR_OK) { err = abort(); } + _free_closed_slot(); _pcb = NULL; if(_discard_cb) { _discard_cb(_discard_cb_arg, this); @@ -868,7 +876,10 @@ int8_t AsyncClient::_close(){ return err; } -void AsyncClient::_allocate_closed_slot(){ +bool AsyncClient::_allocate_closed_slot(){ + if (_closed_slot != -1) { + return true; + } xSemaphoreTake(_slots_lock, portMAX_DELAY); uint32_t closed_slot_min_index = 0; for (int i = 0; i < _number_of_closed_slots; ++ i) { @@ -881,14 +892,17 @@ void AsyncClient::_allocate_closed_slot(){ _closed_slots[_closed_slot] = 0; } xSemaphoreGive(_slots_lock); + return (_closed_slot != -1); } void AsyncClient::_free_closed_slot(){ + xSemaphoreTake(_slots_lock, portMAX_DELAY); if (_closed_slot != -1) { _closed_slots[_closed_slot] = _closed_index; _closed_slot = -1; ++ _closed_index; } + xSemaphoreGive(_slots_lock); } /* @@ -966,13 +980,19 @@ int8_t AsyncClient::_sent(tcp_pcb* pcb, uint16_t len) { } int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { - while(pb != NULL) { + if(!_pcb || pcb != _pcb){ + log_e("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb); + return ERR_OK; + } + size_t total = 0; + while((pb != NULL) && (ERR_OK == err)) { _rx_last_packet = millis(); //we should not ack before we assimilate the data _ack_pcb = true; pbuf *b = pb; pb = b->next; b->next = NULL; + total += b->len; if(_pb_cb){ _pb_cb(_pb_cb_arg, this, b); } else { @@ -981,13 +1001,12 @@ int8_t AsyncClient::_recv(tcp_pcb* pcb, pbuf* pb, int8_t err) { } if(!_ack_pcb) { _rx_ack_len += b->len; - } else if(_pcb) { - _tcp_recved(_pcb, _closed_slot, b->len); } - pbuf_free(b); } + pbuf_free(b); } - return ERR_OK; + err = _tcp_recved(pcb, _closed_slot, total); + return err; } int8_t AsyncClient::_poll(tcp_pcb* pcb){ @@ -1075,9 +1094,12 @@ size_t AsyncClient::write(const char* data) { size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { size_t will_send = add(data, size, apiflags); - if(!will_send || !send()) { + if(!will_send) { return 0; } + while (connected() && !send()) { + taskYIELD(); + } return will_send; } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index b79afc0..144569b 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -222,7 +222,7 @@ class AsyncClient { int8_t _close(); void _free_closed_slot(); - void _allocate_closed_slot(); + bool _allocate_closed_slot(); int8_t _connected(tcp_pcb* pcb, int8_t err); void _error(int8_t err); int8_t _poll(tcp_pcb* pcb);