diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index c21b85d..6902167 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -4,6 +4,7 @@ #include "Arduino.h" #include "AsyncTCP.h" +#include "simple_intrusive_list.h" extern "C" { #include "lwip/dns.h" @@ -11,10 +12,15 @@ extern "C" { #include "lwip/inet.h" #include "lwip/opt.h" #include "lwip/tcp.h" +#include "lwip/tcpip.h" +#include "lwip/sys.h" } #if CONFIG_ASYNC_TCP_USE_WDT #include "esp_task_wdt.h" +#define ASYNC_TCP_MAX_TASK_SLEEP (pdMS_TO_TICKS(1000 * CONFIG_ESP_TASK_WDT_TIMEOUT_S) / 4) +#else +#define ASYNC_TCP_MAX_TASK_SLEEP portMAX_DELAY #endif // Required for: @@ -24,22 +30,29 @@ extern "C" { #endif // https://github.com/espressif/arduino-esp32/issues/10526 +namespace { #ifdef CONFIG_LWIP_TCPIP_CORE_LOCKING -#define TCP_MUTEX_LOCK() \ - if (!sys_thread_tcpip(LWIP_CORE_LOCK_QUERY_HOLDER)) { \ - LOCK_TCPIP_CORE(); \ +struct tcp_core_guard { + bool do_lock; + inline tcp_core_guard() : do_lock(!sys_thread_tcpip(LWIP_CORE_LOCK_QUERY_HOLDER)) { + if (do_lock) { + LOCK_TCPIP_CORE(); + } } - -#define TCP_MUTEX_UNLOCK() \ - if (sys_thread_tcpip(LWIP_CORE_LOCK_QUERY_HOLDER)) { \ - UNLOCK_TCPIP_CORE(); \ + inline ~tcp_core_guard() { + if (do_lock) { + UNLOCK_TCPIP_CORE(); + } } -#else // CONFIG_LWIP_TCPIP_CORE_LOCKING -#define TCP_MUTEX_LOCK() -#define TCP_MUTEX_UNLOCK() + tcp_core_guard(const tcp_core_guard &) = delete; + tcp_core_guard(tcp_core_guard &&) = delete; + tcp_core_guard &operator=(const tcp_core_guard &) = delete; + tcp_core_guard &operator=(tcp_core_guard &&) = delete; +}; +#else // CONFIG_LWIP_TCPIP_CORE_LOCKING +struct tcp_core_guard {}; #endif // CONFIG_LWIP_TCPIP_CORE_LOCKING - -#define INVALID_CLOSED_SLOT -1 +} // anonymous namespace /* TCP poll interval is specified in terms of the TCP coarse timer interval, which is called twice a second @@ -47,6 +60,12 @@ extern "C" { */ #define CONFIG_ASYNC_TCP_POLL_TIMER 1 +#ifdef ASYNC_TCP_DEBUG +#define DEBUG_PRINTF(...) log_d(__VA_ARGS__) +#else +#define DEBUG_PRINTF(...) +#endif + /* * TCP/IP Event Task * */ @@ -63,221 +82,274 @@ typedef enum { LWIP_TCP_DNS } lwip_tcp_event_t; -typedef struct { +struct lwip_tcp_event_packet_t { + lwip_tcp_event_packet_t *next; lwip_tcp_event_t event; - void *arg; + AsyncClient *client; +#ifdef ASYNCTCP_VALIDATE_PCB + tcp_pcb *pcb; +#endif union { struct { - tcp_pcb *pcb; int8_t err; } connected; struct { int8_t err; } error; struct { - tcp_pcb *pcb; - uint16_t len; - } sent; - struct { - tcp_pcb *pcb; pbuf *pb; int8_t err; } recv; struct { - tcp_pcb *pcb; int8_t err; } fin; struct { - tcp_pcb *pcb; - } poll; - struct { - AsyncClient *client; + AsyncServer *server; } accept; struct { const char *name; ip_addr_t addr; } dns; }; -} lwip_tcp_event_packet_t; - -static QueueHandle_t _async_queue = NULL; -static TaskHandle_t _async_service_task_handle = NULL; +}; + +// Detail class for interacting with AsyncClient internals, but without exposing the API to other parts of the program +class AsyncClient_detail { +public: + static inline lwip_tcp_event_packet_t *invalidate_pcb(AsyncClient &client); + static inline lwip_tcp_event_packet_t *get_async_event(); + static void __attribute__((visibility("internal"))) handle_async_event(lwip_tcp_event_packet_t *event); + + // TCP event callbacks + static int8_t __attribute__((visibility("internal"))) tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err); + static int8_t __attribute__((visibility("internal"))) tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len); + static void __attribute__((visibility("internal"))) tcp_error(void *arg, int8_t err); + static int8_t __attribute__((visibility("internal"))) tcp_poll(void *arg, struct tcp_pcb *pcb); +}; + +// helper function +static lwip_tcp_event_packet_t *_alloc_event(lwip_tcp_event_t event, AsyncClient *client, tcp_pcb *pcb) { + // Validation check + if (pcb && (client->pcb() != pcb)) { + // Client structure is corrupt? + log_e("Client mismatch allocating event for 0x%08x 0x%08x vs 0x%08x", (intptr_t)client, (intptr_t)pcb, client->pcb()); + tcp_abort(pcb); + AsyncClient_detail::tcp_error(client, ERR_ARG); + return nullptr; + } + + auto *e = new (std::nothrow) lwip_tcp_event_packet_t{nullptr, event, client}; -static SemaphoreHandle_t _slots_lock = NULL; -static const int _number_of_closed_slots = CONFIG_LWIP_MAX_ACTIVE_TCP; -static uint32_t _closed_slots[_number_of_closed_slots]; -static uint32_t _closed_index = []() { - _slots_lock = xSemaphoreCreateBinary(); - configASSERT(_slots_lock); // Add sanity check - xSemaphoreGive(_slots_lock); - for (int i = 0; i < _number_of_closed_slots; ++i) { - _closed_slots[i] = 1; - } - return 1; -}(); - -static inline bool _init_async_event_queue() { - if (!_async_queue) { - _async_queue = xQueueCreate(CONFIG_ASYNC_TCP_QUEUE_SIZE, sizeof(lwip_tcp_event_packet_t *)); - if (!_async_queue) { - return false; + if (!e) { + // Allocation fail - abort client and give up + log_e("OOM allocating event for 0x%08x 0x%08x", (intptr_t)client, (intptr_t)pcb); + if (pcb) { + tcp_abort(pcb); } + AsyncClient_detail::tcp_error(client, ERR_MEM); + return nullptr; } - return true; -} -static inline bool _send_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS; +#ifdef ASYNCTCP_VALIDATE_PCB + e->pcb = pcb; +#endif + DEBUG_PRINTF("_AE: 0x%08x -> %d 0x%08x 0x%08x", (intptr_t)e, (int)event, (intptr_t)client, (intptr_t)pcb); + return e; } -static inline bool _prepend_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS; +static void _free_event(lwip_tcp_event_packet_t *evpkt) { + DEBUG_PRINTF("_FE: 0x%08x -> %d 0x%08x [0x%08x]", (intptr_t)evpkt, (int)evpkt->event, (intptr_t)evpkt->client, (intptr_t)evpkt->next); + if ((evpkt->event == LWIP_TCP_RECV) && (evpkt->recv.pb != nullptr)) { + pbuf_free(evpkt->recv.pb); + } + delete evpkt; } -static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { - while (true) { - if (!_async_queue) { - break; - } +// Global variables +static simple_intrusive_list _async_queue; +static TaskHandle_t _async_service_task_handle = NULL; -#if CONFIG_ASYNC_TCP_USE_WDT - // need to return periodically to feed the dog - if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) { - break; - } +namespace { +#if defined(CONFIG_LWIP_TCPIP_CORE_LOCKING) && defined(CONFIG_ASYNC_TCP_QUEUE_LWIP_LOCK) +typedef tcp_core_guard queue_mutex_guard; #else - if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) { - break; +class queue_mutex_guard { + + // Create-on-first-use idiom for an embedded mutex + static SemaphoreHandle_t _async_queue_mutex() { + + static SemaphoreHandle_t mutex = xSemaphoreCreateMutex(); + assert(mutex != nullptr); + return mutex; + }; + + bool holds_mutex; + +public: + inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex(), portMAX_DELAY)){}; + inline ~queue_mutex_guard() { + if (holds_mutex) { + xSemaphoreGive(_async_queue_mutex()); } + }; + inline explicit operator bool() const { + return holds_mutex; + }; +}; #endif +} // namespace - if ((*e)->event != LWIP_TCP_POLL) { - return true; - } +static inline bool _init_async_event_queue() { + return true; +} - /* - Let's try to coalesce two (or more) consecutive poll events into one - this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue - if consecutive user callback for a same connection runs longer that poll time then it will fill the queue with events until it deadlocks. - This is a workaround to mitigate such poor designs and won't let other events/connections to starve the task time. - It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving. - todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections - */ - lwip_tcp_event_packet_t *next_pkt = NULL; - while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) { - // if the next event that will come is a poll event for the same connection, we can discard it and continue - if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL) { - if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) { - free(next_pkt); - next_pkt = NULL; - log_d("coalescing polls, network congestion or async callbacks might be too slow!"); - continue; - } - } +static inline void _send_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_back(e); +#ifdef ASYNC_TCP_DEBUG + uint32_t n; + xTaskNotifyAndQuery(_async_service_task_handle, 1, eIncrement, &n); + DEBUG_PRINTF("0x%08x", (intptr_t)e); +#else + xTaskNotifyGive(_async_service_task_handle); +#endif +} - // quit while loop if next incoming event can't be discarded (not a poll event) - break; - } +static inline void _prepend_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_front(e); - /* - now we have to decide if to proceed with poll callback handler or discard it? - poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events. - I.e. on each poll app would try to generate more data to send, which in turn results in additional ack event triggering chain effect - for long connections. Or poll callback could take long time starving other connections. Anyway our goal is to keep the queue length - grows under control (if possible) and poll events are the safest to discard. - Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4 - Poll events are periodic and connection could get another chance next time - */ - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { - free(*e); - *e = NULL; - log_d("discarding poll due to queue congestion"); - continue; // continue main loop to dequeue next event which we know is not a poll event - } - return true; // queue not nearly full, caller can process the poll event - } - return false; +#ifdef ASYNC_TCP_DEBUG + uint32_t n; + xTaskNotifyAndQuery(_async_service_task_handle, 1, eIncrement, &n); + DEBUG_PRINTF("0x%08x", (intptr_t)e); +#else + xTaskNotifyGive(_async_service_task_handle); +#endif } -static bool _remove_events_with_arg(void *arg) { - if (!_async_queue) { - return false; - } +static void _remove_events_for(AsyncClient *client) { +#ifdef ASYNC_TCP_DEBUG + auto start_length = _async_queue.size(); +#endif - lwip_tcp_event_packet_t *first_packet = NULL; - lwip_tcp_event_packet_t *packet = NULL; + auto removed_event_chain = _async_queue.remove_if([=](lwip_tcp_event_packet_t &pkt) { + return pkt.client == client; + }); - // figure out which is the first non-matching packet so we can keep the order - while (!first_packet) { - if (xQueueReceive(_async_queue, &first_packet, 0) != pdPASS) { - return false; - } - // discard packet if matching - if ((uintptr_t)first_packet->arg == (uintptr_t)arg) { - free(first_packet); - first_packet = NULL; - } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { - // try to return first packet to the back of the queue - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(first_packet); - first_packet = NULL; - return false; - } + size_t count = 0; + while (removed_event_chain) { + ++count; + auto t = removed_event_chain; + removed_event_chain = t->next; + _free_event(t); } - while (xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet) { - if (xQueueReceive(_async_queue, &packet, 0) != pdPASS) { - return false; - } - if ((uintptr_t)packet->arg == (uintptr_t)arg) { - // remove matching event - free(packet); - packet = NULL; - // otherwise try to requeue it - } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(packet); - packet = NULL; - return false; - } +#ifdef ASYNC_TCP_DEBUG + auto end_length = _async_queue.size(); + assert(count + end_length == start_length); + assert(_async_queue.validate_tail()); + + DEBUG_PRINTF("Removed %d/%d for 0x%08x", count, start_length, (intptr_t)client); +#endif +}; + +static lwip_tcp_event_packet_t *_register_pcb(tcp_pcb *pcb, AsyncClient *client) { + // do client-specific setup + auto end_event = _alloc_event(LWIP_TCP_ERROR, client, pcb); + if (end_event) { + tcp_arg(pcb, client); + tcp_recv(pcb, &AsyncClient_detail::tcp_recv); + tcp_sent(pcb, &AsyncClient_detail::tcp_sent); + tcp_err(pcb, &AsyncClient_detail::tcp_error); + tcp_poll(pcb, &AsyncClient_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); + }; + return end_event; +} + +static void _teardown_pcb(tcp_pcb *pcb) { + assert(pcb); + // Do teardown + tcp_arg(pcb, NULL); + tcp_sent(pcb, NULL); + tcp_recv(pcb, NULL); + tcp_err(pcb, NULL); + tcp_poll(pcb, NULL, 0); +} + +inline lwip_tcp_event_packet_t *AsyncClient_detail::invalidate_pcb(AsyncClient &client) { + auto end_event = client._end_event; + _teardown_pcb(client._pcb); + client._pcb = nullptr; + client._end_event = nullptr; + _remove_events_for(&client); +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + client._recv_pending = nullptr; // killed by _remove_events_for +#endif + return end_event; +}; + +inline lwip_tcp_event_packet_t *AsyncClient_detail::get_async_event() { + queue_mutex_guard guard; + lwip_tcp_event_packet_t *e = _async_queue.pop_front(); + // special case: override values while holding the lock +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + if (e && (e->event = LWIP_TCP_RECV) & & (e->client->_recv_pending == e)) { + e->client->_recv_pending = nullptr; } - return true; +#endif + DEBUG_PRINTF("0x%08x", (intptr_t)e); + return e; } -static void _handle_async_event(lwip_tcp_event_packet_t *e) { - if (e->arg == NULL) { - // do nothing when arg is NULL - // ets_printf("event arg == NULL: 0x%08x\n", e->recv.pcb); - } else if (e->event == LWIP_TCP_CLEAR) { - _remove_events_with_arg(e->arg); - } else if (e->event == LWIP_TCP_RECV) { - // ets_printf("-R: 0x%08x\n", e->recv.pcb); - AsyncClient::_s_recv(e->arg, e->recv.pcb, e->recv.pb, e->recv.err); +void AsyncClient_detail::handle_async_event(lwip_tcp_event_packet_t *e) { + // Special cases first + if (e->event == LWIP_TCP_ERROR) { + DEBUG_PRINTF("-E: 0x%08x %d", e->client, e->error.err); + // Special case: pcb is now invalid, and will have been null'd out by the lwip thread + if (e->client) { + e->client->_error(e->error.err); + } + } else if (e->event == LWIP_TCP_DNS) { // client has no PCB allocated yet + DEBUG_PRINTF("-D: 0x%08x %s = %s", e->client, e->dns.name, ipaddr_ntoa(&e->dns.addr)); + e->client->_dns_found(&e->dns.addr); + } + // Now check for client pointer + else if (e->client->pcb() == NULL) { + // This can only happen if event processing is racing with closing or destruction in a third task. + // Drop the event and do nothing. + DEBUG_PRINTF("event client pcb == NULL: 0x%08x", e->client); + } +#ifdef ASYNCTCP_VALIDATE_PCB + else if (e->client.pcb() != e->pcb) { + log_e("event client pcb mismatch: 0x%08x -> 0x%08x vs 0x%08x", e->client, e->client.pcb(), e->pcb); + } +#endif + // OK, process other events + // TODO: is a switch-case more code efficient? + else if (e->event == LWIP_TCP_RECV) { + DEBUG_PRINTF("-R: 0x%08x", e->client->_pcb); + e->client->_recv(e->recv.pb, e->recv.err); + e->recv.pb = nullptr; // Consumed by client } else if (e->event == LWIP_TCP_FIN) { - // ets_printf("-F: 0x%08x\n", e->fin.pcb); - AsyncClient::_s_fin(e->arg, e->fin.pcb, e->fin.err); + DEBUG_PRINTF("-F: 0x%08x", e->client->_pcb); + e->client->_fin(e->fin.err); } else if (e->event == LWIP_TCP_SENT) { - // ets_printf("-S: 0x%08x\n", e->sent.pcb); - AsyncClient::_s_sent(e->arg, e->sent.pcb, e->sent.len); + DEBUG_PRINTF("-S: 0x%08x", e->client->_pcb); + auto sent = e->client->_sent_pending.exchange(0); + e->client->_sent(sent); } else if (e->event == LWIP_TCP_POLL) { - // ets_printf("-P: 0x%08x\n", e->poll.pcb); - AsyncClient::_s_poll(e->arg, e->poll.pcb); - } else if (e->event == LWIP_TCP_ERROR) { - // ets_printf("-E: 0x%08x %d\n", e->arg, e->error.err); - AsyncClient::_s_error(e->arg, e->error.err); + DEBUG_PRINTF("-P: 0x%08x", e->client->_pcb); + e->client->_polls_pending.store(0); + e->client->_poll(); } else if (e->event == LWIP_TCP_CONNECTED) { - // ets_printf("C: 0x%08x 0x%08x %d\n", e->arg, e->connected.pcb, e->connected.err); - AsyncClient::_s_connected(e->arg, e->connected.pcb, e->connected.err); + DEBUG_PRINTF("-C: 0x%08x 0x%08x %d", e->client, e->client->_pcb, e->connected.err); + e->client->_connected(e->connected.err); } else if (e->event == LWIP_TCP_ACCEPT) { - // ets_printf("A: 0x%08x 0x%08x\n", e->arg, e->accept.client); - AsyncServer::_s_accepted(e->arg, e->accept.client); - } else if (e->event == LWIP_TCP_DNS) { - // ets_printf("D: 0x%08x %s = %s\n", e->arg, e->dns.name, ipaddr_ntoa(&e->dns.addr)); - AsyncClient::_s_dns_found(e->dns.name, &e->dns.addr, e->arg); + DEBUG_PRINTF("-A: 0x%08x 0x%08x", e->client, e->accept.server); + e->accept.server->_accepted(e->client); } - free((void *)(e)); + _free_event(e); } static void _async_service_task(void *pvParameters) { @@ -286,11 +358,17 @@ static void _async_service_task(void *pvParameters) { log_w("Failed to add async task to WDT"); } #endif - lwip_tcp_event_packet_t *packet = NULL; for (;;) { - if (_get_async_event(&packet)) { - _handle_async_event(packet); + while (auto packet = AsyncClient_detail::get_async_event()) { + AsyncClient_detail::handle_async_event(packet); +#if CONFIG_ASYNC_TCP_USE_WDT + esp_task_wdt_reset(); +#endif } + // queue is empty + // DEBUG_PRINTF("Async task waiting 0x%08",(intptr_t)_async_queue_head); + ulTaskNotifyTake(pdTRUE, ASYNC_TCP_MAX_TASK_SLEEP); + // DEBUG_PRINTF("Async task woke = %d 0x%08x",q, (intptr_t)_async_queue_head); #if CONFIG_ASYNC_TCP_USE_WDT esp_task_wdt_reset(); #endif @@ -301,6 +379,7 @@ static void _async_service_task(void *pvParameters) { vTaskDelete(NULL); _async_service_task_handle = NULL; } + /* static void _stop_async_task(){ if(_async_service_task_handle){ @@ -344,173 +423,147 @@ static bool _start_async_task() { * LwIP Callbacks * */ -static int8_t _tcp_clear_events(void *arg) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); - return ERR_MEM; - } - e->event = LWIP_TCP_CLEAR; - e->arg = arg; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } - return ERR_OK; -} - static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) { - // ets_printf("+C: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); + DEBUG_PRINTF("+C: 0x%08x", pcb); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_CONNECTED, client, pcb); + if (e == nullptr) { return ERR_MEM; } - e->event = LWIP_TCP_CONNECTED; - e->arg = arg; - e->connected.pcb = pcb; e->connected.err = err; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } -static int8_t _tcp_poll(void *arg, struct tcp_pcb *pcb) { - // throttle polling events queueing when event queue is getting filled up, let it handle _onack's - // log_d("qs:%u", uxQueueMessagesWaiting(_async_queue)); - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { - log_d("throttling"); +int8_t AsyncClient_detail::tcp_poll(void *arg, struct tcp_pcb *pcb) { + DEBUG_PRINTF("+P: 0x%08x", pcb); + AsyncClient *client = reinterpret_cast(arg); + + // Coalesce event, if possible + if (client->_polls_pending++ != 0) { return ERR_OK; } - // ets_printf("+P: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_POLL, client, pcb); + if (e == nullptr) { return ERR_MEM; } - e->event = LWIP_TCP_POLL; - e->arg = arg; - e->poll.pcb = pcb; - // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case - if (!_send_async_event(&e, 0)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } -static int8_t _tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); - return ERR_MEM; - } - e->arg = arg; +int8_t AsyncClient_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) { + AsyncClient *client = reinterpret_cast(arg); + if (pb) { - // ets_printf("+R: 0x%08x\n", pcb); - e->event = LWIP_TCP_RECV; - e->recv.pcb = pcb; + DEBUG_PRINTF("+R: 0x%08x", pcb); + queue_mutex_guard guard; + + // Coalesce event, if possible +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + if ((err == 0) && (client->_recv_pending)) { + pbuf_cat(client->_recv_pending->pb, pb); + return ERR_OK; + } +#endif + + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_RECV, client, pcb); + if (e == nullptr) { + pbuf_free(pb); + return ERR_MEM; // TODO - error handling??? + } + e->recv.pb = pb; e->recv.err = err; +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + if (err == 0) { + client->_recv_pending = e; + } else { + // if we had one, we can no longer extend it + client->_recv_pending = nullptr; + } +#endif + + _send_async_event(e); } else { - // ets_printf("+F: 0x%08x\n", pcb); - e->event = LWIP_TCP_FIN; - e->fin.pcb = pcb; + DEBUG_PRINTF("+F: 0x%08x -> 0x%08x", pcb, arg); + queue_mutex_guard guard; + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_FIN, client, pcb); + if (e == nullptr) { + return ERR_MEM; + } + e->fin.err = err; - // close the PCB in LwIP thread - AsyncClient::_s_lwip_fin(e->arg, e->fin.pcb, e->fin.err); - } - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; + _send_async_event(e); } + return ERR_OK; } -static int8_t _tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) { - // ets_printf("+S: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); +int8_t AsyncClient_detail::tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) { + DEBUG_PRINTF("+S: 0x%08x", pcb); + AsyncClient *client = reinterpret_cast(arg); + + // Coalesce event, if possible + auto _prev_pending = client->_sent_pending.fetch_add(len); + if (_prev_pending) return ERR_OK; + + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_SENT, client, pcb); + if (e == nullptr) { return ERR_MEM; } - e->event = LWIP_TCP_SENT; - e->arg = arg; - e->sent.pcb = pcb; - e->sent.len = len; - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } -void AsyncClient::_tcp_error(void *arg, int8_t err) { - // ets_printf("+E: 0x%08x\n", arg); +void AsyncClient_detail::tcp_error(void *arg, int8_t err) { + DEBUG_PRINTF("+E: 0x%08x %d", arg, err); AsyncClient *client = reinterpret_cast(arg); - if (client && client->_pcb) { - tcp_arg(client->_pcb, NULL); - if (client->_pcb->state == LISTEN) { - tcp_sent(client->_pcb, NULL); - tcp_recv(client->_pcb, NULL); - tcp_err(client->_pcb, NULL); - tcp_poll(client->_pcb, NULL, 0); - } - client->_pcb = nullptr; - client->_free_closed_slot(); - } - - // enqueue event to be processed in the async task for the user callback - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); - return; - } - e->event = LWIP_TCP_ERROR; - e->arg = arg; - e->error.err = err; - if (!_send_async_event(&e)) { - ::free((void *)(e)); + assert(client); + // The associated pcb is now invalid and will soon be deallocated + // We call on the preallocated end event from the client object + queue_mutex_guard guard; + lwip_tcp_event_packet_t *e = AsyncClient_detail::invalidate_pcb(*client); + if (e) { + e->error.err = err; + _prepend_async_event(e); + } else { + log_e("tcp_error call for client %X with no end event", (intptr_t)client); } } static void _tcp_dns_found(const char *name, struct ip_addr *ipaddr, void *arg) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); - return; - } - // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg); - e->event = LWIP_TCP_DNS; - e->arg = arg; - e->dns.name = name; - if (ipaddr) { - memcpy(&e->dns.addr, ipaddr, sizeof(struct ip_addr)); - } else { - memset(&e->dns.addr, 0, sizeof(e->dns.addr)); - } - if (!_send_async_event(&e)) { - free((void *)(e)); + DEBUG_PRINTF("+DNS: name=%s ipaddr=0x%08x arg=%x", name, ipaddr, arg); + auto client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_DNS, client, client->pcb()); + if (e != nullptr) { + e->dns.name = name; + if (ipaddr) { + memcpy(&e->dns.addr, ipaddr, sizeof(struct ip_addr)); + } else { + memset(&e->dns.addr, 0, sizeof(e->dns.addr)); + } + + queue_mutex_guard guard; + _send_async_event(e); } } -// Used to switch out from LwIP thread -static int8_t _tcp_accept(void *arg, AsyncClient *client) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); +// Runs on LWIP thread +static int8_t _tcp_accept(AsyncServer *server, AsyncClient *client) { + lwip_tcp_event_packet_t *e = _alloc_event(LWIP_TCP_ACCEPT, client, client->pcb()); + if (e == nullptr) { return ERR_MEM; } - e->event = LWIP_TCP_ACCEPT; - e->arg = arg; - e->accept.client = client; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + e->accept.server = server; + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } @@ -522,8 +575,7 @@ static int8_t _tcp_accept(void *arg, AsyncClient *client) { typedef struct { struct tcpip_api_call_data call; - tcp_pcb *pcb; - int8_t closed_slot; + tcp_pcb **pcb_ptr; // double indirection to manage races with client threads int8_t err; union { struct { @@ -533,7 +585,7 @@ typedef struct { } write; size_t received; struct { - ip_addr_t *addr; + const ip_addr_t *addr; uint16_t port; tcp_connected_fn cb; } connect; @@ -541,93 +593,78 @@ typedef struct { ip_addr_t *addr; uint16_t port; } bind; + AsyncClient *client; // used for close() and abort() uint8_t backlog; }; } tcp_api_call_t; +// Given the multithreaded nature of this code, it's possible that pcb has +// been invalidated by the stack thread, but the client thread doesn't know +// yet. Before performing any operation on a pcb, check to make sure we +// are still tracking it. +// AsyncClient guarantees that the _pcb member can only be overwritten by +// the LwIP thread. +static inline bool pcb_is_active(tcp_api_call_t &p) { + return (p.pcb_ptr) && (*p.pcb_ptr); +} + static err_t _tcp_output_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { - msg->err = tcp_output(msg->pcb); + if (pcb_is_active(*msg)) { + msg->err = tcp_output(*msg->pcb_ptr); } return msg->err; } -static esp_err_t _tcp_output(tcp_pcb *pcb, int8_t closed_slot) { - if (!pcb) { - return ERR_CONN; - } - tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; - tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data *)&msg); - return msg.err; -} - static err_t _tcp_write_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { - msg->err = tcp_write(msg->pcb, msg->write.data, msg->write.size, msg->write.apiflags); + if (pcb_is_active(*msg)) { + msg->err = tcp_write(*msg->pcb_ptr, msg->write.data, msg->write.size, msg->write.apiflags); } return msg->err; } -static esp_err_t _tcp_write(tcp_pcb *pcb, int8_t closed_slot, const char *data, size_t size, uint8_t apiflags) { - if (!pcb) { - return ERR_CONN; - } - tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; - msg.write.data = data; - msg.write.size = size; - msg.write.apiflags = apiflags; - tcpip_api_call(_tcp_write_api, (struct tcpip_api_call_data *)&msg); - return msg.err; -} - static err_t _tcp_recved_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { - // if(msg->closed_slot != INVALID_CLOSED_SLOT && !_closed_slots[msg->closed_slot]) { - // if(msg->closed_slot != INVALID_CLOSED_SLOT) { + if (pcb_is_active(*msg)) { msg->err = 0; - tcp_recved(msg->pcb, msg->received); + tcp_recved(*msg->pcb_ptr, msg->received); } return msg->err; } -static esp_err_t _tcp_recved(tcp_pcb *pcb, int8_t closed_slot, size_t len) { - if (!pcb) { - return ERR_CONN; - } - tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; - msg.received = len; - tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data *)&msg); - return msg.err; -} - +// Unlike LwIP's tcp_close, we don't permit failure +// If the pcb can't be closed cleanly, we abort it. static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { - msg->err = tcp_close(msg->pcb); + if (pcb_is_active(*msg)) { + auto pcb = *msg->pcb_ptr; + if (msg->client) { + // Client has requested close; complete teardown + queue_mutex_guard guard; + _free_event(AsyncClient_detail::invalidate_pcb(*msg->client)); + } + msg->err = tcp_close(pcb); + if (msg->err != ERR_OK) { + tcp_abort(pcb); + } + *msg->pcb_ptr = nullptr; } return msg->err; } -static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot) { - if (!pcb) { +// Sets *pcb_ptr to nullptr +static esp_err_t _tcp_close(tcp_pcb **pcb_ptr, AsyncClient *client) { + if (!pcb_ptr || !*pcb_ptr) { return ERR_CONN; } tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; + msg.pcb_ptr = pcb_ptr; + msg.client = client; tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data *)&msg); return msg.err; } @@ -635,46 +672,40 @@ static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot) { static err_t _tcp_abort_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; - if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { - tcp_abort(msg->pcb); + { + queue_mutex_guard guard; + _remove_events_for(msg->client); + } + if (pcb_is_active(*msg)) { + _teardown_pcb(*msg->pcb_ptr); + tcp_abort(*msg->pcb_ptr); + *msg->pcb_ptr = nullptr; } return msg->err; } -static esp_err_t _tcp_abort(tcp_pcb *pcb, int8_t closed_slot) { - if (!pcb) { +// Sets *pcb_ptr to nullptr +static esp_err_t _tcp_abort(tcp_pcb **pcb_ptr, AsyncClient *client) { + if (!pcb_ptr || !*pcb_ptr) { return ERR_CONN; } tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; + msg.pcb_ptr = pcb_ptr; + msg.client = client; tcpip_api_call(_tcp_abort_api, (struct tcpip_api_call_data *)&msg); + assert(*pcb_ptr == nullptr); // must be true return msg.err; } static err_t _tcp_connect_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; - msg->err = tcp_connect(msg->pcb, msg->connect.addr, msg->connect.port, msg->connect.cb); + msg->err = tcp_connect(*msg->pcb_ptr, msg->connect.addr, msg->connect.port, msg->connect.cb); return msg->err; } -static esp_err_t _tcp_connect(tcp_pcb *pcb, int8_t closed_slot, ip_addr_t *addr, uint16_t port, tcp_connected_fn cb) { - if (!pcb) { - return ESP_FAIL; - } - tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = closed_slot; - msg.connect.addr = addr; - msg.connect.port = port; - msg.connect.cb = cb; - tcpip_api_call(_tcp_connect_api, (struct tcpip_api_call_data *)&msg); - return msg.err; -} - static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; - msg->err = tcp_bind(msg->pcb, msg->bind.addr, msg->bind.port); + msg->err = tcp_bind(*msg->pcb_ptr, msg->bind.addr, msg->bind.port); return msg->err; } @@ -683,8 +714,7 @@ static esp_err_t _tcp_bind(tcp_pcb *pcb, ip_addr_t *addr, uint16_t port) { return ESP_FAIL; } tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = -1; + msg.pcb_ptr = &pcb; msg.bind.addr = addr; msg.bind.port = port; tcpip_api_call(_tcp_bind_api, (struct tcpip_api_call_data *)&msg); @@ -694,7 +724,7 @@ static esp_err_t _tcp_bind(tcp_pcb *pcb, ip_addr_t *addr, uint16_t port) { static err_t _tcp_listen_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = 0; - msg->pcb = tcp_listen_with_backlog(msg->pcb, msg->backlog); + *msg->pcb_ptr = tcp_listen_with_backlog(*msg->pcb_ptr, msg->backlog); return msg->err; } @@ -703,11 +733,10 @@ static tcp_pcb *_tcp_listen_with_backlog(tcp_pcb *pcb, uint8_t backlog) { return NULL; } tcp_api_call_t msg; - msg.pcb = pcb; - msg.closed_slot = -1; + msg.pcb_ptr = &pcb; msg.backlog = backlog ? backlog : 0xFF; tcpip_api_call(_tcp_listen_api, (struct tcpip_api_call_data *)&msg); - return msg.pcb; + return pcb; } /* @@ -715,70 +744,41 @@ static tcp_pcb *_tcp_listen_with_backlog(tcp_pcb *pcb, uint8_t backlog) { */ AsyncClient::AsyncClient(tcp_pcb *pcb) - : _connect_cb(0), _connect_cb_arg(0), _discard_cb(0), _discard_cb_arg(0), _sent_cb(0), _sent_cb_arg(0), _error_cb(0), _error_cb_arg(0), _recv_cb(0), - _recv_cb_arg(0), _pb_cb(0), _pb_cb_arg(0), _timeout_cb(0), _timeout_cb_arg(0), _poll_cb(0), _poll_cb_arg(0), _ack_pcb(true), _tx_last_packet(0), - _rx_timeout(0), _rx_last_ack(0), _ack_timeout(CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port(0), prev(NULL), next(NULL) { - _pcb = pcb; - _closed_slot = INVALID_CLOSED_SLOT; + : _pcb(pcb), _end_event(nullptr), _needs_discard(pcb != nullptr), _polls_pending(0), _sent_pending(0), +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + _recv_pending(nullptr), +#endif + _connect_cb(0), _connect_cb_arg(0), _discard_cb(0), _discard_cb_arg(0), _sent_cb(0), _sent_cb_arg(0), _error_cb(0), _error_cb_arg(0), _recv_cb(0), + _recv_cb_arg(0), _pb_cb(0), _pb_cb_arg(0), _timeout_cb(0), _timeout_cb_arg(0), _ack_pcb(true), _tx_last_packet(0), _rx_timeout(0), _rx_last_ack(0), + _ack_timeout(CONFIG_ASYNC_TCP_MAX_ACK_TIME), _connect_port(0) { if (_pcb) { + tcp_core_guard tg; + _end_event = _register_pcb(_pcb, this); _rx_last_packet = millis(); - tcp_arg(_pcb, this); - tcp_recv(_pcb, &_tcp_recv); - tcp_sent(_pcb, &_tcp_sent); - tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); - if (!_allocate_closed_slot()) { - _close(); + if (!_end_event) { + // Out of memory!! + log_e("Unable to allocate error event"); + // Swallow this PCB, producing a null client object + tcp_abort(_pcb); + _pcb = nullptr; + _needs_discard = false; } } + DEBUG_PRINTF("+AC: 0x%08x -> 0x%08x [0x%08x]", _pcb, (intptr_t)this, (intptr_t)_end_event); } AsyncClient::~AsyncClient() { + DEBUG_PRINTF("-AC: 0x%08x -> 0x%08x [0x%08x]", _pcb, (intptr_t)this, (intptr_t)_end_event); if (_pcb) { _close(); } - _free_closed_slot(); -} - -/* - * Operators - * */ - -AsyncClient &AsyncClient::operator=(const AsyncClient &other) { - if (_pcb) { - _close(); - } - - _pcb = other._pcb; - _closed_slot = other._closed_slot; - if (_pcb) { - _rx_last_packet = millis(); - tcp_arg(_pcb, this); - tcp_recv(_pcb, &_tcp_recv); - tcp_sent(_pcb, &_tcp_sent); - tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); - } - return *this; -} - -bool AsyncClient::operator==(const AsyncClient &other) const { - return _pcb == other._pcb; -} - -AsyncClient &AsyncClient::operator+=(const AsyncClient &other) { - if (next == NULL) { - next = (AsyncClient *)(&other); - next->prev = this; - } else { - AsyncClient *c = next; - while (c->next != NULL) { - c = c->next; - } - c->next = (AsyncClient *)(&other); - c->next->prev = c; - } - return *this; + assert(!_end_event); // should have been cleaned up by _close +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + assert(!_recv_pending); +#endif + assert(_needs_discard == false); // If we needed the discard callback, it must have been called by now + // We take care to clear this flag before calling the discard callback + // as the discard callback commonly destructs the AsyncClient object. } /* @@ -829,7 +829,7 @@ void AsyncClient::onPoll(AcConnectHandler cb, void *arg) { * Main Public Methods * */ -bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) { +bool AsyncClient::_connect(const ip_addr_t &addr, uint16_t port) { if (_pcb) { log_d("already connected, state %d", _pcb->state); return false; @@ -839,27 +839,31 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) { return false; } - if (!_allocate_closed_slot()) { - log_e("failed to allocate: closed slot full"); - return false; - } + { + tcp_core_guard tg; + _pcb = tcp_new_ip_type(addr.type); + if (!_pcb) { + log_e("pcb == NULL"); + return false; + } + _end_event = _register_pcb(_pcb, this); - TCP_MUTEX_LOCK(); - tcp_pcb *pcb = tcp_new_ip_type(addr.type); - if (!pcb) { - TCP_MUTEX_UNLOCK(); - log_e("pcb == NULL"); - return false; + if (!_end_event) { + log_e("Unable to allocate event"); + tcp_abort(_pcb); + _pcb = nullptr; + return false; + } + _needs_discard = true; } - tcp_arg(pcb, this); - tcp_err(pcb, &_tcp_error); - tcp_recv(pcb, &_tcp_recv); - tcp_sent(pcb, &_tcp_sent); - tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); - TCP_MUTEX_UNLOCK(); - esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); - return err == ESP_OK; + tcp_api_call_t msg; + msg.pcb_ptr = &_pcb; + msg.connect.addr = &addr; + msg.connect.port = port; + msg.connect.cb = (tcp_connected_fn)&_tcp_connected; + tcpip_api_call(_tcp_connect_api, (struct tcpip_api_call_data *)&msg); + return msg.err == ESP_OK; } bool AsyncClient::connect(const IPAddress &ip, uint16_t port) { @@ -891,9 +895,12 @@ bool AsyncClient::connect(const char *host, uint16_t port) { return false; } - TCP_MUTEX_LOCK(); - err_t err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this); - TCP_MUTEX_UNLOCK(); + err_t err; + { + tcp_core_guard tg; + err = dns_gethostbyname(host, &addr, (dns_found_callback)&_tcp_dns_found, this); + } + if (err == ERR_OK) { #if ESP_IDF_VERSION_MAJOR < 5 #if LWIP_IPV6 @@ -917,15 +924,15 @@ bool AsyncClient::connect(const char *host, uint16_t port) { void AsyncClient::close(bool now) { if (_pcb) { - _tcp_recved(_pcb, _closed_slot, _rx_ack_len); + _recved(_rx_ack_len); } _close(); } int8_t AsyncClient::abort() { if (_pcb) { - _tcp_abort(_pcb, _closed_slot); - _pcb = NULL; + _tcp_abort(&_pcb, this); + _needs_discard = false; } return ERR_ABRT; } @@ -945,19 +952,29 @@ size_t AsyncClient::add(const char *data, size_t size, uint8_t apiflags) { if (!room) { return 0; } - size_t will_send = (room < size) ? room : size; - int8_t err = ERR_OK; - err = _tcp_write(_pcb, _closed_slot, data, will_send, apiflags); - if (err != ERR_OK) { + tcp_api_call_t msg; + msg.pcb_ptr = &_pcb; + msg.err = ERR_OK; + msg.write.data = data; + msg.write.size = std::min(room, size); + msg.write.apiflags = apiflags; + tcpip_api_call(_tcp_write_api, (struct tcpip_api_call_data *)&msg); + if (msg.err != ERR_OK) { return 0; } - return will_send; + return msg.write.size; } bool AsyncClient::send() { auto backup = _tx_last_packet; _tx_last_packet = millis(); - if (_tcp_output(_pcb, _closed_slot) == ERR_OK) { + if (!_pcb) { + return false; + } + tcp_api_call_t msg; + msg.pcb_ptr = &_pcb; + tcpip_api_call(_tcp_output_api, (struct tcpip_api_call_data *)&msg); + if (msg.err == ERR_OK) { return true; } _tx_last_packet = backup; @@ -969,7 +986,7 @@ size_t AsyncClient::ack(size_t len) { len = _rx_ack_len; } if (len) { - _tcp_recved(_pcb, _closed_slot, len); + _recved(len); } _rx_ack_len -= len; return len; @@ -979,7 +996,7 @@ void AsyncClient::ackPacket(struct pbuf *pb) { if (!pb) { return; } - _tcp_recved(_pcb, _closed_slot, pb->len); + _recved(pb->len); pbuf_free(pb); } @@ -988,71 +1005,23 @@ void AsyncClient::ackPacket(struct pbuf *pb) { * */ int8_t AsyncClient::_close() { - // ets_printf("X: 0x%08x\n", (uint32_t)this); - int8_t err = ERR_OK; - if (_pcb) { - TCP_MUTEX_LOCK(); - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - TCP_MUTEX_UNLOCK(); - _tcp_clear_events(this); - err = _tcp_close(_pcb, _closed_slot); - if (err != ERR_OK) { - err = abort(); - } - _free_closed_slot(); - _pcb = NULL; - if (_discard_cb) { - _discard_cb(_discard_cb_arg, this); - } + DEBUG_PRINTF("close: 0x%08x -> 0x%08x, %d", (uint32_t)this, _pcb, _needs_discard); + // We always call close, regardless of current state + int8_t err = _tcp_close(&_pcb, this); + auto call_discard = _needs_discard; + _needs_discard = false; + if (call_discard && _discard_cb) { + _discard_cb(_discard_cb_arg, this); } return err; } -bool AsyncClient::_allocate_closed_slot() { - bool allocated = false; - if (xSemaphoreTake(_slots_lock, portMAX_DELAY) == pdTRUE) { - uint32_t closed_slot_min_index = 0; - allocated = _closed_slot != INVALID_CLOSED_SLOT; - if (!allocated) { - for (int i = 0; i < _number_of_closed_slots; ++i) { - if ((_closed_slot == INVALID_CLOSED_SLOT || _closed_slots[i] <= closed_slot_min_index) && _closed_slots[i] != 0) { - closed_slot_min_index = _closed_slots[i]; - _closed_slot = i; - } - } - allocated = _closed_slot != INVALID_CLOSED_SLOT; - if (allocated) { - _closed_slots[_closed_slot] = 0; - } - } - xSemaphoreGive(_slots_lock); - } - return allocated; -} - -void AsyncClient::_free_closed_slot() { - xSemaphoreTake(_slots_lock, portMAX_DELAY); - if (_closed_slot != INVALID_CLOSED_SLOT) { - _closed_slots[_closed_slot] = _closed_index; - _closed_slot = INVALID_CLOSED_SLOT; - ++_closed_index; - } - xSemaphoreGive(_slots_lock); -} - /* * Private Callbacks * */ -int8_t AsyncClient::_connected(tcp_pcb *pcb, int8_t err) { - _pcb = reinterpret_cast(pcb); - if (_pcb) { - _rx_last_packet = millis(); - } +int8_t AsyncClient::_connected(int8_t err) { + _rx_last_packet = millis(); if (_connect_cb) { _connect_cb(_connect_cb_arg, this); } @@ -1060,45 +1029,26 @@ int8_t AsyncClient::_connected(tcp_pcb *pcb, int8_t err) { } void AsyncClient::_error(int8_t err) { + assert(_needs_discard); if (_error_cb) { _error_cb(_error_cb_arg, this, err); } + _needs_discard = false; if (_discard_cb) { _discard_cb(_discard_cb_arg, this); } } -// In LwIP Thread -int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { - if (!_pcb || pcb != _pcb) { - log_d("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb); - return ERR_OK; - } - tcp_arg(_pcb, NULL); - if (_pcb->state == LISTEN) { - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - } - if (tcp_close(_pcb) != ERR_OK) { - tcp_abort(_pcb); - } - _free_closed_slot(); - _pcb = NULL; - return ERR_OK; -} - // In Async Thread -int8_t AsyncClient::_fin(tcp_pcb *pcb, int8_t err) { - _tcp_clear_events(this); - if (_discard_cb) { - _discard_cb(_discard_cb_arg, this); - } +int8_t AsyncClient::_fin(int8_t err) { + // WM: This isn't strictly correct -- we should instead pass this to a callback + // _fin() merely indicates that the remote end is closing, it doesn't require us + // to close until we're done sending. + _close(); return ERR_OK; } -int8_t AsyncClient::_sent(tcp_pcb *pcb, uint16_t len) { +int8_t AsyncClient::_sent(uint16_t len) { _rx_last_ack = _rx_last_packet = millis(); if (_sent_cb) { _sent_cb(_sent_cb_arg, this, len, (_rx_last_packet - _tx_last_packet)); @@ -1106,7 +1056,7 @@ int8_t AsyncClient::_sent(tcp_pcb *pcb, uint16_t len) { return ERR_OK; } -int8_t AsyncClient::_recv(tcp_pcb *pcb, pbuf *pb, int8_t err) { +int8_t AsyncClient::_recv(pbuf *pb, int8_t err) { while (pb != NULL) { _rx_last_packet = millis(); // we should not ack before we assimilate the data @@ -1123,7 +1073,7 @@ int8_t AsyncClient::_recv(tcp_pcb *pcb, pbuf *pb, int8_t err) { if (!_ack_pcb) { _rx_ack_len += b->len; } else if (_pcb) { - _tcp_recved(_pcb, _closed_slot, b->len); + _recved(b->len); } } pbuf_free(b); @@ -1131,16 +1081,7 @@ int8_t AsyncClient::_recv(tcp_pcb *pcb, pbuf *pb, int8_t err) { return ERR_OK; } -int8_t AsyncClient::_poll(tcp_pcb *pcb) { - if (!_pcb) { - // log_d("pcb is NULL"); - return ERR_OK; - } - if (pcb != _pcb) { - log_d("0x%08x != 0x%08x", (uint32_t)pcb, (uint32_t)_pcb); - return ERR_OK; - } - +int8_t AsyncClient::_poll() { uint32_t now = millis(); // ACK Timeout @@ -1148,7 +1089,7 @@ int8_t AsyncClient::_poll(tcp_pcb *pcb) { const uint32_t one_day = 86400000; bool last_tx_is_after_last_ack = (_rx_last_ack - _tx_last_packet + one_day) < one_day; if (last_tx_is_after_last_ack && (now - _tx_last_packet) >= _ack_timeout) { - log_d("ack timeout %d", pcb->state); + log_d("ack timeout %d", _pcb->state); if (_timeout_cb) { _timeout_cb(_timeout_cb_arg, this, (now - _tx_last_packet)); } @@ -1157,7 +1098,7 @@ int8_t AsyncClient::_poll(tcp_pcb *pcb) { } // RX Timeout if (_rx_timeout && (now - _rx_last_packet) >= (_rx_timeout * 1000)) { - log_d("rx timeout %d", pcb->state); + log_d("rx timeout %d", _pcb->state); _close(); return ERR_OK; } @@ -1183,13 +1124,20 @@ void AsyncClient::_dns_found(struct ip_addr *ipaddr) { connect(ip, _connect_port); #endif } else { - if (_error_cb) { - _error_cb(_error_cb_arg, this, -55); - } - if (_discard_cb) { - _discard_cb(_discard_cb_arg, this); - } + // No address found + this->_error(-55); + } +} + +int8_t AsyncClient::_recved(size_t len) { + if (!_pcb) { + return ERR_CONN; } + tcp_api_call_t msg; + msg.pcb_ptr = &_pcb; + msg.received = len; + tcpip_api_call(_tcp_recved_api, (struct tcpip_api_call_data *)&msg); + return msg.err; } /* @@ -1249,6 +1197,9 @@ bool AsyncClient::getNoDelay() { } void AsyncClient::setKeepAlive(uint32_t ms, uint8_t cnt) { + if (!_pcb) { + return; + } if (ms != 0) { _pcb->so_options |= SOF_KEEPALIVE; // Turn on TCP Keepalive for the given pcb // Set the time between keepalive messages in milli-seconds @@ -1470,42 +1421,6 @@ const char *AsyncClient::stateToString() const { } } -/* - * Static Callbacks (LwIP C2C++ interconnect) - * */ - -void AsyncClient::_s_dns_found(const char *name, struct ip_addr *ipaddr, void *arg) { - reinterpret_cast(arg)->_dns_found(ipaddr); -} - -int8_t AsyncClient::_s_poll(void *arg, struct tcp_pcb *pcb) { - return reinterpret_cast(arg)->_poll(pcb); -} - -int8_t AsyncClient::_s_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) { - return reinterpret_cast(arg)->_recv(pcb, pb, err); -} - -int8_t AsyncClient::_s_fin(void *arg, struct tcp_pcb *pcb, int8_t err) { - return reinterpret_cast(arg)->_fin(pcb, err); -} - -int8_t AsyncClient::_s_lwip_fin(void *arg, struct tcp_pcb *pcb, int8_t err) { - return reinterpret_cast(arg)->_lwip_fin(pcb, err); -} - -int8_t AsyncClient::_s_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) { - return reinterpret_cast(arg)->_sent(pcb, len); -} - -void AsyncClient::_s_error(void *arg, int8_t err) { - reinterpret_cast(arg)->_error(err); -} - -int8_t AsyncClient::_s_connected(void *arg, struct tcp_pcb *pcb, int8_t err) { - return reinterpret_cast(arg)->_connected(pcb, err); -} - /* Async TCP Server */ @@ -1557,9 +1472,10 @@ void AsyncServer::begin() { return; } int8_t err; - TCP_MUTEX_LOCK(); - _pcb = tcp_new_ip_type(_bind4 && _bind6 ? IPADDR_TYPE_ANY : (_bind6 ? IPADDR_TYPE_V6 : IPADDR_TYPE_V4)); - TCP_MUTEX_UNLOCK(); + { + tcp_core_guard tg; + _pcb = tcp_new_ip_type(_bind4 && _bind6 ? IPADDR_TYPE_ANY : (_bind6 ? IPADDR_TYPE_V6 : IPADDR_TYPE_V4)); + }; if (!_pcb) { log_e("_pcb == NULL"); return; @@ -1580,8 +1496,7 @@ void AsyncServer::begin() { err = _tcp_bind(_pcb, &local_addr, _port); if (err != ERR_OK) { - _tcp_close(_pcb, -1); - _pcb = NULL; + _tcp_close(&_pcb, nullptr); log_e("bind error: %d", err); return; } @@ -1592,22 +1507,18 @@ void AsyncServer::begin() { log_e("listen_pcb == NULL"); return; } - TCP_MUTEX_LOCK(); + tcp_core_guard tg; tcp_arg(_pcb, (void *)this); tcp_accept(_pcb, &_s_accept); - TCP_MUTEX_UNLOCK(); } void AsyncServer::end() { if (_pcb) { - TCP_MUTEX_LOCK(); + tcp_core_guard tg; tcp_arg(_pcb, NULL); tcp_accept(_pcb, NULL); if (tcp_close(_pcb) != ERR_OK) { - TCP_MUTEX_UNLOCK(); - _tcp_abort(_pcb, -1); - } else { - TCP_MUTEX_UNLOCK(); + tcp_abort(_pcb); } _pcb = NULL; } @@ -1615,10 +1526,12 @@ void AsyncServer::end() { // runs on LwIP thread int8_t AsyncServer::_accept(tcp_pcb *pcb, int8_t err) { + DEBUG_PRINTF("+A: 0x%08x %d", pcb, err); if (!pcb) { log_e("_accept failed: pcb is NULL"); return ERR_ABRT; } + if (_connect_cb) { AsyncClient *c = new (std::nothrow) AsyncClient(pcb); if (c && c->pcb()) { @@ -1626,25 +1539,19 @@ int8_t AsyncServer::_accept(tcp_pcb *pcb, int8_t err) { if (_tcp_accept(this, c) == ERR_OK) { return ERR_OK; // success } - // Couldn't allocate accept event - // We can't let the client object call in to close, as we're on the LWIP thread; it could deadlock trying to RPC to itself - c->_pcb = nullptr; - tcp_abort(pcb); - log_e("_accept failed: couldn't accept client"); - return ERR_ABRT; } if (c) { // Couldn't complete setup // pcb has already been aborted delete c; - pcb = nullptr; - log_e("_accept failed: couldn't complete setup"); + log_e("_accept failed: couldn't accept client"); return ERR_ABRT; } log_e("_accept failed: couldn't allocate client"); } else { log_e("_accept failed: no onConnect callback"); } + tcp_abort(pcb); return ERR_OK; } diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 7715ec3..91ebece 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -13,6 +13,7 @@ #endif #include "lwip/ip6_addr.h" #include "lwip/ip_addr.h" +#include #include #ifndef LIBRETINY @@ -47,10 +48,6 @@ extern "C" { #define CONFIG_ASYNC_TCP_PRIORITY 10 #endif -#ifndef CONFIG_ASYNC_TCP_QUEUE_SIZE -#define CONFIG_ASYNC_TCP_QUEUE_SIZE 64 -#endif - #ifndef CONFIG_ASYNC_TCP_MAX_ACK_TIME #define CONFIG_ASYNC_TCP_MAX_ACK_TIME 5000 #endif @@ -69,20 +66,29 @@ typedef std::function AcTimeoutHandl struct tcp_pcb; struct ip_addr; +class AsyncClient_detail; +struct lwip_tcp_event_packet_t; class AsyncClient { public: AsyncClient(tcp_pcb *pcb = 0); ~AsyncClient(); - AsyncClient &operator=(const AsyncClient &other); - AsyncClient &operator+=(const AsyncClient &other); + // Noncopyable + AsyncClient(const AsyncClient &) = delete; + AsyncClient &operator=(const AsyncClient &) = delete; - bool operator==(const AsyncClient &other) const; + // Nonmovable + AsyncClient(AsyncClient &&) = delete; + AsyncClient &operator=(AsyncClient &&) = delete; - bool operator!=(const AsyncClient &other) const { + inline bool operator==(const AsyncClient &other) const { + return _pcb == other._pcb; + } + inline bool operator!=(const AsyncClient &other) const { return !(*this == other); } + bool connect(const IPAddress &ip, uint16_t port); #if ESP_IDF_VERSION_MAJOR < 5 bool connect(const IPv6Address &ip, uint16_t port); @@ -90,6 +96,7 @@ class AsyncClient { bool connect(const char *host, uint16_t port); /** * @brief close connection + * @note will call onDisconnect callback * * @param now - ignored */ @@ -98,6 +105,13 @@ class AsyncClient { void stop() { close(false); }; + + /** + * @brief abort connection + * @note does NOT call onDisconnect callback! + * + * @return always returns ERR_ABRT + */ int8_t abort(); bool free(); @@ -211,7 +225,6 @@ class AsyncClient { // set callback - data received (called if onPacket is not used) void onData(AcDataHandler cb, void *arg = 0); // set callback - data received - // !!! You MUST call ackPacket() or free the pbuf yourself to prevent memory leaks void onPacket(AcPacketHandler cb, void *arg = 0); // set callback - ack timeout void onTimeout(AcTimeoutHandler cb, void *arg = 0); @@ -230,29 +243,19 @@ class AsyncClient { static const char *errorToString(int8_t error); const char *stateToString() const; - // internal callbacks - Do NOT call any of the functions below in user code! - static int8_t _s_poll(void *arg, struct tcp_pcb *tpcb); - static int8_t _s_recv(void *arg, struct tcp_pcb *tpcb, struct pbuf *pb, int8_t err); - static int8_t _s_fin(void *arg, struct tcp_pcb *tpcb, int8_t err); - static int8_t _s_lwip_fin(void *arg, struct tcp_pcb *tpcb, int8_t err); - static void _s_error(void *arg, int8_t err); - static int8_t _s_sent(void *arg, struct tcp_pcb *tpcb, uint16_t len); - static int8_t _s_connected(void *arg, struct tcp_pcb *tpcb, int8_t err); - static void _s_dns_found(const char *name, struct ip_addr *ipaddr, void *arg); - static void _tcp_error(void *arg, int8_t err); - - int8_t _recv(tcp_pcb *pcb, pbuf *pb, int8_t err); tcp_pcb *pcb() { return _pcb; } protected: - friend class AsyncServer; - - bool _connect(ip_addr_t addr, uint16_t port); - tcp_pcb *_pcb; - int8_t _closed_slot; + lwip_tcp_event_packet_t *_end_event; + bool _needs_discard; + std::atomic _polls_pending; + std::atomic _sent_pending; +#ifdef CONFIG_ASYNC_TCP_COALESCE_RECV + struct lwip_tcp_event_packet_t *_recv_pending; +#endif AcConnectHandler _connect_cb; void *_connect_cb_arg; @@ -280,20 +283,18 @@ class AsyncClient { uint32_t _ack_timeout; uint16_t _connect_port; + friend class AsyncClient_detail; + bool _connect(const ip_addr_t &addr, uint16_t port); int8_t _close(); - void _free_closed_slot(); - bool _allocate_closed_slot(); - int8_t _connected(tcp_pcb *pcb, int8_t err); + int8_t _connected(int8_t err); void _error(int8_t err); - int8_t _poll(tcp_pcb *pcb); - int8_t _sent(tcp_pcb *pcb, uint16_t len); - int8_t _fin(tcp_pcb *pcb, int8_t err); + int8_t _poll(); + int8_t _sent(uint16_t len); + int8_t _fin(int8_t err); int8_t _lwip_fin(tcp_pcb *pcb, int8_t err); + int8_t _recv(pbuf *pb, int8_t err); void _dns_found(struct ip_addr *ipaddr); - -public: - AsyncClient *prev; - AsyncClient *next; + int8_t _recved(size_t len); }; class AsyncServer { @@ -304,6 +305,15 @@ class AsyncServer { #endif AsyncServer(uint16_t port); ~AsyncServer(); + + // Noncopyable + AsyncServer(const AsyncServer &) = delete; + AsyncServer &operator=(const AsyncServer &) = delete; + + // Nonmovable + AsyncServer(AsyncServer &&) = delete; + AsyncServer &operator=(AsyncServer &&) = delete; + void onClient(AcConnectHandler cb, void *arg); void begin(); void end(); @@ -328,6 +338,7 @@ class AsyncServer { AcConnectHandler _connect_cb; void *_connect_cb_arg; + friend class AsyncClient_detail; int8_t _accept(tcp_pcb *newpcb, int8_t err); int8_t _accepted(AsyncClient *client); }; diff --git a/src/simple_intrusive_list.h b/src/simple_intrusive_list.h new file mode 100644 index 0000000..29d1c0f --- /dev/null +++ b/src/simple_intrusive_list.h @@ -0,0 +1,129 @@ +// Simple intrusive list class + +#pragma once + +template class simple_intrusive_list { + static_assert(std::is_same().next), T *>::value, "Template type must have public 'T* next' member"); + +public: + typedef T value_type; + typedef value_type *value_ptr_type; + typedef value_ptr_type *value_ptr_ptr_type; + + // Static utility methods + static size_t list_size(value_ptr_type chain) { + size_t count = 0; + for (auto c = chain; c != nullptr; c = c->next) { + ++count; + } + return count; + } + + static void delete_list(value_ptr_type chain) { + while (chain) { + auto t = chain; + chain = chain->next; + delete t; + } + } + +public: + // Object methods + + simple_intrusive_list() : _head(nullptr), _tail(&_head) {} + ~simple_intrusive_list() { + clear(); + } + + // Noncopyable, nonmovable + simple_intrusive_list(const simple_intrusive_list &) = delete; + simple_intrusive_list(simple_intrusive_list &&) = delete; + simple_intrusive_list &operator=(const simple_intrusive_list &) = delete; + simple_intrusive_list &operator=(simple_intrusive_list &&) = delete; + + inline void push_back(value_ptr_type obj) { + if (obj) { + *_tail = obj; + _tail = &obj->next; + } + } + + inline void push_front(value_ptr_type obj) { + if (obj) { + if (_head == nullptr) { + _tail = &obj->next; + } + obj->next = _head; + _head = obj; + } + } + + inline value_ptr_type pop_front() { + auto rv = _head; + if (_head) { + if (_tail == &_head->next) { + _tail = &_head; + } + _head = _head->next; + } + return rv; + } + + inline void clear() { + // Assumes all elements were allocated with "new" + delete_list(_head); + _head = nullptr; + _tail = &_head; + } + + inline size_t size() const { + return list_size(_head); + } + + template inline value_ptr_type remove_if(const function_type &condition) { + value_ptr_type removed = nullptr; + value_ptr_ptr_type current_ptr = &_head; + while (*current_ptr != nullptr) { + value_ptr_type current = *current_ptr; + if (condition(*current)) { + // Remove this item from the list by moving the next item in + *current_ptr = current->next; + // If we were the last item, reset tail + if (current->next == nullptr) { + _tail = current_ptr; + } + // Prepend this item to the removed list + current->next = removed; + removed = current; + // do not advance current_ptr + } else { + // advance current_ptr + current_ptr = &(*current_ptr)->next; + } + } + + // Return the removed entries + return removed; + } + + inline value_ptr_type begin() const { + return _head; + } + + bool validate_tail() const { + if (_head == nullptr) { + return (_tail == &_head); + } + auto p = _head; + while (p->next != nullptr) { + p = p->next; + } + return _tail == &p->next; + } + +private: + // Data members + value_ptr_type _head; + value_ptr_ptr_type _tail; + +}; // class simple_intrusive_list